Browse Source

AMBARI-10409: [WinTP2] Merge HDPWIN HDFS package scripts into common services (jluniya)

Jayush Luniya 10 năm trước cách đây
mục cha
commit
e989ec0e5f
27 tập tin đã thay đổi với 816 bổ sung1201 xóa
  1. 29 35
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py
  2. 35 1
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs.py
  3. 19 14
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py
  4. 21 4
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_datanode.py
  5. 62 14
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
  6. 21 3
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_snamenode.py
  7. 30 6
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py
  8. 84 48
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py
  9. 4 379
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params.py
  10. 399 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
  11. 2 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_windows.py
  12. 37 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/service_check.py
  13. 21 22
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py
  14. 24 17
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py
  15. 27 9
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py
  16. 0 49
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/datanode.py
  17. 0 54
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs.py
  18. 0 41
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_client.py
  19. 0 130
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py
  20. 0 48
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/journalnode.py
  21. 0 128
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py
  22. 0 55
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/service_check.py
  23. 0 24
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/service_mapping.py
  24. 0 48
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/snamenode.py
  25. 0 51
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/zkfc_slave.py
  26. 0 21
      ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/templates/exclude_hosts_list.j2
  27. 1 0
      ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py

+ 29 - 35
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py

@@ -23,51 +23,30 @@ from resource_management.libraries.functions.version import compare_versions, fo
 from resource_management.libraries.functions.security_commons import build_expectations, \
   cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML
 from hdfs import hdfs
-
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 class DataNode(Script):
-
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-datanode"}
-
   def install(self, env):
     import params
-
     self.install_packages(env, params.exclude_packages)
     env.set_params(params)
 
-
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing DataNode Rolling Upgrade pre-restart")
-    import params
-    env.set_params(params)
-
-    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
-      Execute(format("hdp-select set hadoop-hdfs-datanode {version}"))
-
-
-  def post_rolling_restart(self, env):
-    Logger.info("Executing DataNode Rolling Upgrade post-restart")
+  def configure(self, env):
     import params
     env.set_params(params)
-
-    # ensure the DataNode has started and rejoined the cluster
-    datanode_upgrade.post_upgrade_check()
-
+    hdfs("datanode")
+    datanode(action="configure")
 
   def start(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
     self.configure(env)
     datanode(action="start")
 
-
   def stop(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
-
     # pre-upgrade steps shutdown the datanode, so there's no need to call
     # action=stop
     if rolling_restart:
@@ -75,19 +54,31 @@ class DataNode(Script):
     else:
       datanode(action="stop")
 
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    datanode(action = "status")
 
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs()
-    datanode(action="configure")
 
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class DataNodeDefault(DataNode):
 
-  def status(self, env):
-    import status_params
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-datanode"}
 
-    env.set_params(status_params)
-    check_process_status(status_params.datanode_pid_file)
+  def pre_rolling_restart(self, env):
+    Logger.info("Executing DataNode Rolling Upgrade pre-restart")
+    import params
+    env.set_params(params)
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set hadoop-hdfs-datanode {version}"))
+
+  def post_rolling_restart(self, env):
+    Logger.info("Executing DataNode Rolling Upgrade post-restart")
+    import params
+    env.set_params(params)
+    # ensure the DataNode has started and rejoined the cluster
+    datanode_upgrade.post_upgrade_check()
 
   def security_status(self, env):
     import status_params
@@ -147,6 +138,9 @@ class DataNode(Script):
     else:
       self.put_structured_out({"securityState": "UNSECURED"})
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class DataNodeWindows(DataNode):
+  pass
 
 if __name__ == "__main__":
   DataNode().execute()

+ 35 - 1
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs.py

@@ -22,8 +22,10 @@ Ambari Agent
 from resource_management import *
 import sys
 import os
+from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+from ambari_commons import OSConst
 
-
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
 def hdfs(name=None):
   import params
 
@@ -81,3 +83,35 @@ def hdfs(name=None):
   
   if params.lzo_enabled and len(params.lzo_packages) > 0:
       Package(params.lzo_packages)
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def hdfs(component=None):
+  import params
+  if component == "namenode":
+    directories = params.dfs_name_dir.split(",")
+    Directory(directories,
+              owner=params.hdfs_user,
+              mode="(OI)(CI)F",
+              recursive=True
+    )
+    File(params.exclude_file_path,
+         content=Template("exclude_hosts_list.j2"),
+         owner=params.hdfs_user,
+         mode="f",
+         )
+  if "hadoop-policy" in params.config['configurations']:
+    XmlConfig("hadoop-policy.xml",
+              conf_dir=params.hadoop_conf_dir,
+              configurations=params.config['configurations']['hadoop-policy'],
+              owner=params.hdfs_user,
+              mode="f",
+              configuration_attributes=params.config['configuration_attributes']['hadoop-policy']
+    )
+
+  XmlConfig("hdfs-site.xml",
+            conf_dir=params.hadoop_conf_dir,
+            configurations=params.config['configurations']['hdfs-site'],
+            owner=params.hdfs_user,
+            mode="f",
+            configuration_attributes=params.config['configuration_attributes']['hdfs-site']
+  )

+ 19 - 14
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py

@@ -22,43 +22,44 @@ from resource_management.libraries.functions.security_commons import build_expec
   cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
   FILE_TYPE_XML
 from hdfs import hdfs
-from utils import service
-
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 class HdfsClient(Script):
 
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-client"}
-
   def install(self, env):
     import params
-
     self.install_packages(env, params.exclude_packages)
     env.set_params(params)
-    self.config(env)
+    self.configure(env)
 
-  def pre_rolling_restart(self, env):
+  def configure(self, env):
     import params
     env.set_params(params)
-    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
-      Execute(format("hdp-select set hadoop-client {version}"))
+    hdfs()
 
   def start(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
 
   def stop(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
 
   def status(self, env):
     raise ClientComponentHasNoStatus()
 
-  def config(self, env):
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class HdfsClientDefault(HdfsClient):
+
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-client"}
+
+  def pre_rolling_restart(self, env):
     import params
-    hdfs()
+    env.set_params(params)
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set hadoop-client {version}"))
 
   def security_status(self, env):
     import status_params
@@ -105,5 +106,9 @@ class HdfsClient(Script):
     else:
       self.put_structured_out({"securityState": "UNSECURED"})
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class HdfsClientWindows(HdfsClient):
+  pass
+
 if __name__ == "__main__":
   HdfsClient().execute()

+ 21 - 4
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_datanode.py

@@ -20,6 +20,8 @@ limitations under the License.
 from resource_management import *
 from resource_management.libraries.functions.dfs_datanode_helper import handle_dfs_data_dir
 from utils import service
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from ambari_commons import OSConst
 
 
 def create_dirs(data_dir, params):
@@ -36,10 +38,10 @@ def create_dirs(data_dir, params):
             ignore_failures=True
   )
 
-
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
 def datanode(action=None):
-  import params
   if action == "configure":
+    import params
     Directory(params.dfs_domain_socket_dir,
               recursive=True,
               mode=0751,
@@ -47,8 +49,8 @@ def datanode(action=None):
               group=params.user_group)
 
     handle_dfs_data_dir(create_dirs, params)
-
   elif action == "start" or action == "stop":
+    import params
     Directory(params.hadoop_pid_dir_prefix,
               mode=0755,
               owner=params.hdfs_user,
@@ -59,4 +61,19 @@ def datanode(action=None):
       user=params.hdfs_user,
       create_pid_dir=True,
       create_log_dir=True
-    )
+    )
+  elif action == "status":
+    import status_params
+    check_process_status(status_params.datanode_pid_file)
+
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def datanode(action=None):
+  if action == "configure":
+    pass
+  elif(action == "start" or action == "stop"):
+    import params
+    Service(params.datanode_win_service_name, action=action)
+  elif action == "status":
+    import status_params
+    check_windows_service_status(status_params.datanode_win_service_name)

+ 62 - 14
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py

@@ -21,18 +21,22 @@ import os.path
 from resource_management import *
 from resource_management.core.logger import Logger
 from resource_management.core.exceptions import ComponentIsNotRunning
-
+from resource_management.libraries.functions.check_process_status import check_process_status
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from ambari_commons import OSConst
 from utils import service, safe_zkfc_op
+from setup_ranger_hdfs import setup_ranger_hdfs
 
-
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
 def namenode(action=None, do_format=True, rolling_restart=False, env=None):
-  import params
-  #we need this directory to be present before any action(HA manual steps for
-  #additional namenode)
   if action == "configure":
+    import params
+    #we need this directory to be present before any action(HA manual steps for
+    #additional namenode)
     create_name_dirs(params.dfs_name_dir)
-
-  if action == "start":
+  elif action == "start":
+    setup_ranger_hdfs()
+    import params
     if do_format:
       format_namenode()
       pass
@@ -62,7 +66,7 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
 
     options = "-rollingUpgrade started" if rolling_restart else ""
 
-    if rolling_restart:    
+    if rolling_restart:
       # Must start Zookeeper Failover Controller if it exists on this host because it could have been killed in order to initiate the failover.
       safe_zkfc_op(action, env)
 
@@ -75,7 +79,6 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
       create_log_dir=True
     )
 
-
     if params.security_enabled:
       Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
               user = params.hdfs_user)
@@ -113,14 +116,38 @@ def namenode(action=None, do_format=True, rolling_restart=False, env=None):
             only_if=dfs_check_nn_status_cmd #skip when HA not active
     )
     create_hdfs_directories(dfs_check_nn_status_cmd)
-
-  if action == "stop":
+  elif action == "stop":
+    import params
     service(
       action="stop", name="namenode", 
       user=params.hdfs_user
     )
+  elif action == "status":
+    import status_params
+    check_process_status(status_params.namenode_pid_file)
+  elif action == "decommission":
+    decommission()
 
-  if action == "decommission":
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def namenode(action=None, do_format=True, rolling_restart=False, env=None):
+  if action == "configure":
+    pass
+  elif action == "start":
+    import params
+    #TODO: Replace with format_namenode()
+    namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED")
+    if not os.path.exists(namenode_format_marker):
+      hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
+      Execute("%s namenode -format" % (hadoop_cmd))
+      open(namenode_format_marker, 'a').close()
+    Service(params.namenode_win_service_name, action=action)
+  elif action == "stop":
+    import params
+    Service(params.namenode_win_service_name, action=action)
+  elif action == "status":
+    import status_params
+    check_windows_service_status(status_params.namenode_win_service_name)
+  elif action == "decommission":
     decommission()
 
 def create_name_dirs(directories):
@@ -250,8 +277,9 @@ def is_namenode_formatted(params):
       print format("ERROR: Namenode directory(s) is non empty. Will not format the namenode. List of non-empty namenode dirs {nn_name_dirs}")
       break
        
-  return marked    
-      
+  return marked
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
 def decommission():
   import params
 
@@ -283,6 +311,26 @@ def decommission():
                   kinit_override=True,
                   bin_dir=params.hadoop_bin_dir)
 
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def decommission():
+  import params
+  hdfs_user = params.hdfs_user
+  conf_dir = params.hadoop_conf_dir
+
+  File(params.exclude_file_path,
+       content=Template("exclude_hosts_list.j2"),
+       owner=hdfs_user
+  )
+
+  if params.dfs_ha_enabled:
+    # due to a bug in hdfs, refreshNodes will not run on both namenodes so we
+    # need to execute each command scoped to a particular namenode
+    nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes')
+  else:
+    nn_refresh_cmd = format('cmd /c hadoop dfsadmin -refreshNodes')
+  Execute(nn_refresh_cmd, user=hdfs_user)
+
+
 def bootstrap_standby_namenode(params):
   try:
     iterations = 50

+ 21 - 3
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_snamenode.py

@@ -20,12 +20,13 @@ limitations under the License.
 from resource_management import *
 from utils import service
 from utils import hdfs_directory
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from ambari_commons import OSConst
 
-
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
 def snamenode(action=None, format=False):
-  import params
-
   if action == "configure":
+    import params
     for fs_checkpoint_dir in params.fs_checkpoint_dirs:
       Directory(fs_checkpoint_dir,
                 recursive=True,
@@ -38,6 +39,7 @@ def snamenode(action=None, format=False):
          owner=params.hdfs_user,
          group=params.user_group)
   elif action == "start" or action == "stop":
+    import params
     Directory(params.hadoop_pid_dir_prefix,
               mode=0755,
               owner=params.hdfs_user,
@@ -50,3 +52,19 @@ def snamenode(action=None, format=False):
       create_pid_dir=True,
       create_log_dir=True
     )
+  elif action == "status":
+    import status_params
+    check_process_status(status_params.snamenode_pid_file)
+
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def snamenode(action=None, format=False):
+  if action == "configure":
+    pass
+  elif action == "start" or action == "stop":
+    import params
+    Service(params.snamenode_win_service_name, action=action)
+  elif action == "status":
+    import status_params
+    check_windows_service_status(status_params.snamenode_win_service_name)
+

+ 30 - 6
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py

@@ -28,19 +28,21 @@ from resource_management.libraries.functions.security_commons import build_expec
 from utils import service
 from hdfs import hdfs
 import journalnode_upgrade
-
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 class JournalNode(Script):
-
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-journalnode"}
-
   def install(self, env):
     import params
-
     self.install_packages(env, params.exclude_packages)
     env.set_params(params)
 
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class JournalNodeDefault(JournalNode):
+
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-journalnode"}
+
   def pre_rolling_restart(self, env):
     Logger.info("Executing Rolling Upgrade pre-restart")
     import params
@@ -157,6 +159,28 @@ class JournalNode(Script):
     else:
       self.put_structured_out({"securityState": "UNSECURED"})
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class JournalNodeWindows(JournalNode):
+
+  def start(self, env):
+    import params
+    self.configure(env)
+    Service(params.journalnode_win_service_name, action="start")
+
+  def stop(self, env):
+    import params
+    Service(params.journalnode_win_service_name, action="stop")
+
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    hdfs()
+    pass
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_windows_service_status(status_params.journalnode_win_service_name)
 
 if __name__ == "__main__":
   JournalNode().execute()

+ 84 - 48
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py

@@ -28,16 +28,17 @@ from resource_management.libraries.functions.security_commons import build_expec
 from resource_management.libraries.functions.version import compare_versions, \
   format_hdp_stack_version
 from resource_management.libraries.functions.format import format
-from resource_management.libraries.functions.check_process_status import check_process_status
 from resource_management.core.exceptions import Fail
 from resource_management.libraries.functions import get_klist_path
+from datetime import datetime
 
 import namenode_upgrade
 from hdfs_namenode import namenode
 from hdfs import hdfs
 import hdfs_rebalance
 from utils import failover_namenode
-from setup_ranger_hdfs import setup_ranger_hdfs
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 # hashlib is supplied as of Python 2.5 as the replacement interface for md5
 # and other secure hashes.  In 2.6, md5 is deprecated.  Import hashlib if
@@ -52,74 +53,73 @@ except ImportError:
 
 class NameNode(Script):
 
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-namenode"}
-
   def install(self, env):
     import params
-
     self.install_packages(env, params.exclude_packages)
     env.set_params(params)
     #TODO we need this for HA because of manual steps
     self.configure(env)
 
-  def prepare_rolling_upgrade(self, env):
-    namenode_upgrade.prepare_rolling_upgrade()
-
-  def finalize_rolling_upgrade(self, env):
-    namenode_upgrade.finalize_rolling_upgrade()
-
-  def pre_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade pre-restart")
+  def configure(self, env):
     import params
     env.set_params(params)
-
-    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
-      Execute(format("hdp-select set hadoop-hdfs-namenode {version}"))
+    hdfs("namenode")
+    namenode(action="configure", env=env)
 
   def start(self, env, rolling_restart=False):
     import params
-
     env.set_params(params)
     self.configure(env)
-    setup_ranger_hdfs()
     namenode(action="start", rolling_restart=rolling_restart, env=env)
 
-  def post_rolling_restart(self, env):
-    Logger.info("Executing Rolling Upgrade post-restart")
-    import params
-    env.set_params(params)
-
-    Execute("hdfs dfsadmin -report -live",
-            user=params.hdfs_user
-    )
-
   def stop(self, env, rolling_restart=False):
     import params
     env.set_params(params)
-
     if rolling_restart and params.dfs_ha_enabled:
       if params.dfs_ha_automatic_failover_enabled:
         failover_namenode()
       else:
         raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
-
     namenode(action="stop", rolling_restart=rolling_restart, env=env)
 
-  def configure(self, env):
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    namenode(action="status", rolling_restart=False, env=env)
+
+  def decommission(self, env):
     import params
+    env.set_params(params)
+    namenode(action="decommission")
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class NameNodeDefault(NameNode):
 
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-namenode"}
+
+  def prepare_rolling_upgrade(self, env):
+    namenode_upgrade.prepare_rolling_upgrade()
+
+  def finalize_rolling_upgrade(self, env):
+    namenode_upgrade.finalize_rolling_upgrade()
+
+  def pre_rolling_restart(self, env):
+    Logger.info("Executing Rolling Upgrade pre-restart")
+    import params
     env.set_params(params)
-    hdfs()
-    namenode(action="configure", env=env)
-    pass
 
-  def status(self, env):
-    import status_params
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
+      Execute(format("hdp-select set hadoop-hdfs-namenode {version}"))
 
-    env.set_params(status_params)
-    check_process_status(status_params.namenode_pid_file)
-    pass
+  def post_rolling_restart(self, env):
+    Logger.info("Executing Rolling Upgrade post-restart")
+    import params
+    env.set_params(params)
+
+    Execute("hdfs dfsadmin -report -live",
+            user=params.hdfs_user
+    )
 
   def security_status(self, env):
     import status_params
@@ -178,15 +178,6 @@ class NameNode(Script):
     else:
       self.put_structured_out({"securityState": "UNSECURED"})
 
-
-  def decommission(self, env):
-    import params
-
-    env.set_params(params)
-    namenode(action="decommission")
-    pass
-
-
   def rebalancehdfs(self, env):
     import params
     env.set_params(params)
@@ -255,6 +246,51 @@ class NameNode(Script):
       # Delete the kerberos credentials cache (ccache) file
       os.remove(ccache_file_path)
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class NameNodeWindows(NameNode):
+
+  def rebalancehdfs(self, env):
+    from ambari_commons.os_windows import run_os_command_impersonated
+    import params
+    env.set_params(params)
+
+    hdfs_user = params.hdfs_user
+
+    name_node_parameters = json.loads( params.name_node_params )
+    threshold = name_node_parameters['threshold']
+    _print("Starting balancer with threshold = %s\n" % threshold)
+
+    def calculateCompletePercent(first, current):
+      return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
+
+    def startRebalancingProcess(threshold):
+      rebalanceCommand = 'hdfs balancer -threshold %s' % threshold
+      return ['cmd', '/C', rebalanceCommand]
+
+    command = startRebalancingProcess(threshold)
+    basedir = os.path.join(env.config.basedir, 'scripts')
+
+    _print("Executing command %s\n" % command)
+
+    parser = hdfs_rebalance.HdfsParser()
+    returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_user, Script.get_password(hdfs_user))
+
+    for line in stdout.split('\n'):
+      _print('[balancer] %s %s' % (str(datetime.now()), line ))
+      pl = parser.parseLine(line)
+      if pl:
+        res = pl.toJson()
+        res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
+
+        self.put_structured_out(res)
+      elif parser.state == 'PROCESS_FINISED' :
+        _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
+        self.put_structured_out({'completePercent' : 1})
+        break
+
+    if returncode != None and returncode != 0:
+      raise Fail('Hdfs rebalance process exited with error. See the log output')
+
 def _print(line):
   sys.stdout.write(line)
   sys.stdout.flush()

+ 4 - 379
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params.py

@@ -16,384 +16,9 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+from ambari_commons import OSCheck
 
-from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
-from ambari_commons.os_check import OSCheck
-from resource_management.libraries.functions.default import default
-from resource_management import *
-import status_params
-import utils
-import os
-import itertools
-import re
-
-config = Script.get_config()
-tmp_dir = Script.get_tmp_dir()
-
-stack_name = default("/hostLevelParams/stack_name", None)
-upgrade_direction = default("/commandParams/upgrade_direction", None)
-
-stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
-hdp_stack_version = format_hdp_stack_version(stack_version_unformatted)
-
-# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
-version = default("/commandParams/version", None)
-
-security_enabled = config['configurations']['cluster-env']['security_enabled']
-hdfs_user = status_params.hdfs_user
-root_user = "root"
-hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix
-
-# Some datanode settings
-dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None)
-dfs_dn_http_addr = default('/configurations/hdfs-site/dfs.datanode.http.address', None)
-dfs_dn_https_addr = default('/configurations/hdfs-site/dfs.datanode.https.address', None)
-dfs_http_policy = default('/configurations/hdfs-site/dfs.http.policy', None)
-dfs_dn_ipc_address = config['configurations']['hdfs-site']['dfs.datanode.ipc.address']
-secure_dn_ports_are_in_use = False
-
-#hadoop params
-if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
-  mapreduce_libs_path = "/usr/hdp/current/hadoop-mapreduce-client/*"
-  hadoop_libexec_dir = "/usr/hdp/current/hadoop-client/libexec"
-  hadoop_bin = "/usr/hdp/current/hadoop-client/sbin"
-  hadoop_bin_dir = "/usr/hdp/current/hadoop-client/bin"
-  hadoop_home = "/usr/hdp/current/hadoop-client"
-  if not security_enabled:
-    hadoop_secure_dn_user = '""'
-  else:
-    dfs_dn_port = utils.get_port(dfs_dn_addr)
-    dfs_dn_http_port = utils.get_port(dfs_dn_http_addr)
-    dfs_dn_https_port = utils.get_port(dfs_dn_https_addr)
-    # We try to avoid inability to start datanode as a plain user due to usage of root-owned ports
-    if dfs_http_policy == "HTTPS_ONLY":
-      secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_https_port)
-    elif dfs_http_policy == "HTTP_AND_HTTPS":
-      secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) or utils.is_secure_port(dfs_dn_https_port)
-    else:   # params.dfs_http_policy == "HTTP_ONLY" or not defined:
-      secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port)
-    if secure_dn_ports_are_in_use:
-      hadoop_secure_dn_user = hdfs_user
-    else:
-      hadoop_secure_dn_user = '""'
-else:
-  mapreduce_libs_path = "/usr/lib/hadoop-mapreduce/*"
-  hadoop_libexec_dir = "/usr/lib/hadoop/libexec"
-  hadoop_bin = "/usr/lib/hadoop/sbin"
-  hadoop_bin_dir = "/usr/bin"
-  hadoop_home = "/usr/lib/hadoop"
-  hadoop_secure_dn_user = hdfs_user
-
-hadoop_conf_dir = "/etc/hadoop/conf"
-hadoop_conf_empty_dir = "/etc/hadoop/conf.empty"
-limits_conf_dir = "/etc/security/limits.d"
-
-execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir
-ulimit_cmd = "ulimit -c unlimited ; "
-
-#security params
-smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab']
-hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
-falcon_user = config['configurations']['falcon-env']['falcon_user']
-
-#exclude file
-hdfs_exclude_file = default("/clusterHostInfo/decom_dn_hosts", [])
-exclude_file_path = config['configurations']['hdfs-site']['dfs.hosts.exclude']
-update_exclude_file_only = default("/commandParams/update_exclude_file_only",False)
-
-klist_path_local = functions.get_klist_path()
-kinit_path_local = functions.get_kinit_path()
-#hosts
-hostname = config["hostname"]
-rm_host = default("/clusterHostInfo/rm_host", [])
-slave_hosts = default("/clusterHostInfo/slave_hosts", [])
-oozie_servers = default("/clusterHostInfo/oozie_server", [])
-hcat_server_hosts = default("/clusterHostInfo/webhcat_server_host", [])
-hive_server_host =  default("/clusterHostInfo/hive_server_host", [])
-hbase_master_hosts = default("/clusterHostInfo/hbase_master_hosts", [])
-hs_host = default("/clusterHostInfo/hs_host", [])
-jtnode_host = default("/clusterHostInfo/jtnode_host", [])
-namenode_host = default("/clusterHostInfo/namenode_host", [])
-nm_host = default("/clusterHostInfo/nm_host", [])
-ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", [])
-journalnode_hosts = default("/clusterHostInfo/journalnode_hosts", [])
-zkfc_hosts = default("/clusterHostInfo/zkfc_hosts", [])
-falcon_host = default("/clusterHostInfo/falcon_server_hosts", [])
-
-has_ganglia_server = not len(ganglia_server_hosts) == 0
-has_namenodes = not len(namenode_host) == 0
-has_jobtracker = not len(jtnode_host) == 0
-has_resourcemanager = not len(rm_host) == 0
-has_histroryserver = not len(hs_host) == 0
-has_hbase_masters = not len(hbase_master_hosts) == 0
-has_slaves = not len(slave_hosts) == 0
-has_oozie_server = not len(oozie_servers)  == 0
-has_hcat_server_host = not len(hcat_server_hosts)  == 0
-has_hive_server_host = not len(hive_server_host)  == 0
-has_journalnode_hosts = not len(journalnode_hosts)  == 0
-has_zkfc_hosts = not len(zkfc_hosts)  == 0
-has_falcon_host = not len(falcon_host)  == 0
-
-
-is_namenode_master = hostname in namenode_host
-is_jtnode_master = hostname in jtnode_host
-is_rmnode_master = hostname in rm_host
-is_hsnode_master = hostname in hs_host
-is_hbase_master = hostname in hbase_master_hosts
-is_slave = hostname in slave_hosts
-
-if has_ganglia_server:
-  ganglia_server_host = ganglia_server_hosts[0]
-
-#users and groups
-yarn_user = config['configurations']['yarn-env']['yarn_user']
-hbase_user = config['configurations']['hbase-env']['hbase_user']
-oozie_user = config['configurations']['oozie-env']['oozie_user']
-webhcat_user = config['configurations']['hive-env']['hcat_user']
-hcat_user = config['configurations']['hive-env']['hcat_user']
-hive_user = config['configurations']['hive-env']['hive_user']
-smoke_user =  config['configurations']['cluster-env']['smokeuser']
-smokeuser_principal =  config['configurations']['cluster-env']['smokeuser_principal_name']
-mapred_user = config['configurations']['mapred-env']['mapred_user']
-hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
-
-user_group = config['configurations']['cluster-env']['user_group']
-root_group = "root"
-proxyuser_group =  config['configurations']['hadoop-env']['proxyuser_group']
-
-#hadoop params
-hdfs_log_dir_prefix = config['configurations']['hadoop-env']['hdfs_log_dir_prefix']
-hadoop_root_logger = config['configurations']['hadoop-env']['hadoop_root_logger']
-
-dfs_domain_socket_path = config['configurations']['hdfs-site']['dfs.domain.socket.path']
-dfs_domain_socket_dir = os.path.dirname(dfs_domain_socket_path)
-
-jn_edits_dir = config['configurations']['hdfs-site']['dfs.journalnode.edits.dir']
-
-dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir']
-
-namenode_dirs_created_stub_dir = format("{hdfs_log_dir_prefix}/{hdfs_user}")
-namenode_dirs_stub_filename = "namenode_dirs_created"
-
-smoke_hdfs_user_dir = format("/user/{smoke_user}")
-smoke_hdfs_user_mode = 0770
-
-
-hdfs_namenode_formatted_mark_suffix = "/namenode-formatted/"
-namenode_formatted_old_mark_dirs = ["/var/run/hadoop/hdfs/namenode-formatted", 
-  format("{hadoop_pid_dir_prefix}/hdfs/namenode/formatted"),
-  "/var/lib/hdfs/namenode/formatted"]
-dfs_name_dirs = dfs_name_dir.split(",")
-namenode_formatted_mark_dirs = []
-for dn_dir in dfs_name_dirs:
- tmp_mark_dir = format("{dn_dir}{hdfs_namenode_formatted_mark_suffix}")
- namenode_formatted_mark_dirs.append(tmp_mark_dir)
-
-# Use the namenode RPC address if configured, otherwise, fallback to the default file system
-namenode_address = None
-if 'dfs.namenode.rpc-address' in config['configurations']['hdfs-site']:
-  namenode_rpcaddress = config['configurations']['hdfs-site']['dfs.namenode.rpc-address']
-  namenode_address = format("hdfs://{namenode_rpcaddress}")
-else:
-  namenode_address = config['configurations']['core-site']['fs.defaultFS']
-
-fs_checkpoint_dirs = config['configurations']['hdfs-site']['dfs.namenode.checkpoint.dir'].split(',')
-
-dfs_data_dir = config['configurations']['hdfs-site']['dfs.datanode.data.dir']
-dfs_data_dir = ",".join([re.sub(r'^\[.+\]', '', dfs_dir.strip()) for dfs_dir in dfs_data_dir.split(",")])
-
-data_dir_mount_file = config['configurations']['hadoop-env']['dfs.datanode.data.dir.mount.file']
-
-# HDFS High Availability properties
-dfs_ha_enabled = False
-dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None)
-dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None)
-dfs_ha_automatic_failover_enabled = default("/configurations/hdfs-site/dfs.ha.automatic-failover.enabled", False)
-
-# hostname of the active HDFS HA Namenode (only used when HA is enabled)
-dfs_ha_namenode_active = default("/configurations/hadoop-env/dfs_ha_initial_namenode_active", None)
-# hostname of the standby HDFS HA Namenode (only used when HA is enabled)
-dfs_ha_namenode_standby = default("/configurations/hadoop-env/dfs_ha_initial_namenode_standby", None)
-
-namenode_id = None
-namenode_rpc = None
-
-if dfs_ha_namenode_ids:
-  dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
-  dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
-  if dfs_ha_namenode_ids_array_len > 1:
-    dfs_ha_enabled = True
-if dfs_ha_enabled:
-  for nn_id in dfs_ha_namemodes_ids_list:
-    nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
-    if hostname in nn_host:
-      namenode_id = nn_id
-      namenode_rpc = nn_host
-  # With HA enabled namenode_address is recomputed
-  namenode_address = format('hdfs://{dfs_ha_nameservices}')
-
-if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY":
-  https_only = True
-  journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.https-address', None)
-else:
-  https_only = False
-  journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.http-address', None)
-
-if journalnode_address:
-  journalnode_port = journalnode_address.split(":")[1]
-  
-  
-if security_enabled:
-  _dn_principal_name = config['configurations']['hdfs-site']['dfs.datanode.kerberos.principal']
-  _dn_keytab = config['configurations']['hdfs-site']['dfs.datanode.keytab.file']
-  _dn_principal_name = _dn_principal_name.replace('_HOST',hostname.lower())
-  
-  dn_kinit_cmd = format("{kinit_path_local} -kt {_dn_keytab} {_dn_principal_name};")
-  
-  _nn_principal_name = config['configurations']['hdfs-site']['dfs.namenode.kerberos.principal']
-  _nn_keytab = config['configurations']['hdfs-site']['dfs.namenode.keytab.file']
-  _nn_principal_name = _nn_principal_name.replace('_HOST',hostname.lower())
-  
-  nn_kinit_cmd = format("{kinit_path_local} -kt {_nn_keytab} {_nn_principal_name};")
-
-  _jn_principal_name = default("/configurations/hdfs-site/dfs.journalnode.kerberos.principal", None)
-  if _jn_principal_name:
-    _jn_principal_name = _jn_principal_name.replace('_HOST', hostname.lower())
-  _jn_keytab = default("/configurations/hdfs-site/dfs.journalnode.keytab.file", None)
-  jn_kinit_cmd = format("{kinit_path_local} -kt {_jn_keytab} {_jn_principal_name};")
-else:
-  dn_kinit_cmd = ""
-  nn_kinit_cmd = ""
-  jn_kinit_cmd = ""
-
-import functools
-#create partial functions with common arguments for every HdfsDirectory call
-#to create hdfs directory we need to call params.HdfsDirectory in code
-HdfsDirectory = functools.partial(
-  HdfsDirectory,
-  conf_dir=hadoop_conf_dir,
-  hdfs_user=hdfs_user,
-  security_enabled = security_enabled,
-  keytab = hdfs_user_keytab,
-  kinit_path_local = kinit_path_local,
-  bin_dir = hadoop_bin_dir
-)
-
-# The logic for LZO also exists in OOZIE's params.py
-io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None)
-lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower()
-lzo_packages = get_lzo_packages(stack_version_unformatted)
-
-exclude_packages = []
-if not lzo_enabled:
-  exclude_packages += lzo_packages
-  
-name_node_params = default("/commandParams/namenode", None)
-
-#hadoop params
-hadoop_env_sh_template = config['configurations']['hadoop-env']['content']
-
-#hadoop-env.sh
-java_home = config['hostLevelParams']['java_home']
-java_version = int(config['hostLevelParams']['java_version'])
-
-if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.0') >= 0 and compare_versions(hdp_stack_version, '2.1') < 0 and not OSCheck.is_suse_family():
-  # deprecated rhel jsvc_path
-  jsvc_path = "/usr/libexec/bigtop-utils"
+if OSCheck.is_windows_family():
+  from params_windows import *
 else:
-  jsvc_path = "/usr/lib/bigtop-utils"
-
-hadoop_heapsize = config['configurations']['hadoop-env']['hadoop_heapsize']
-namenode_heapsize = config['configurations']['hadoop-env']['namenode_heapsize']
-namenode_opt_newsize = config['configurations']['hadoop-env']['namenode_opt_newsize']
-namenode_opt_maxnewsize = config['configurations']['hadoop-env']['namenode_opt_maxnewsize']
-namenode_opt_permsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_permsize","128m")
-namenode_opt_maxpermsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_maxpermsize","256m")
-
-jtnode_opt_newsize = "200m"
-jtnode_opt_maxnewsize = "200m"
-jtnode_heapsize =  "1024m"
-ttnode_heapsize = "1024m"
-
-dtnode_heapsize = config['configurations']['hadoop-env']['dtnode_heapsize']
-mapred_pid_dir_prefix = default("/configurations/mapred-env/mapred_pid_dir_prefix","/var/run/hadoop-mapreduce")
-mapred_log_dir_prefix = default("/configurations/mapred-env/mapred_log_dir_prefix","/var/log/hadoop-mapreduce")
-
-# ranger host
-ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
-has_ranger_admin = not len(ranger_admin_hosts) == 0
-
-if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
-  # setting flag value for ranger hdfs plugin
-  enable_ranger_hdfs = False
-  ranger_plugin_enable = default("/configurations/ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled", "no")
-  if ranger_plugin_enable.lower() == 'yes':
-    enable_ranger_hdfs = True
-  elif ranger_plugin_enable.lower() == 'no':
-    enable_ranger_hdfs = False
-
-ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
-
-#ranger hdfs properties
-policymgr_mgr_url = default("/configurations/admin-properties/policymgr_external_url", "http://localhost:6080")
-sql_connector_jar = default("/configurations/admin-properties/SQL_CONNECTOR_JAR", "/usr/share/java/mysql-connector-java.jar")
-xa_audit_db_flavor = default("/configurations/admin-properties/DB_FLAVOR", "MYSQL")
-xa_audit_db_name = default("/configurations/admin-properties/audit_db_name", "ranger_audit")
-xa_audit_db_user = default("/configurations/admin-properties/audit_db_user", "rangerlogger")
-xa_audit_db_password = default("/configurations/admin-properties/audit_db_password", "rangerlogger")
-xa_db_host = default("/configurations/admin-properties/db_host", "localhost")
-repo_name = str(config['clusterName']) + '_hadoop'
-db_enabled = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.DB.IS_ENABLED", "false")
-hdfs_enabled = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.IS_ENABLED", "false")
-hdfs_dest_dir = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINATION_DIRECTORY", "hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit/app-type/time:yyyyMMdd")
-hdfs_buffer_dir = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY", "__REPLACE__LOG_DIR/hadoop/app-type/audit")
-hdfs_archive_dir = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY", "__REPLACE__LOG_DIR/hadoop/app-type/audit/archive")
-hdfs_dest_file = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_FILE", "hostname-audit.log")
-hdfs_dest_flush_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_FLUSH_INTERVAL_SECONDS", "900")
-hdfs_dest_rollover_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_ROLLOVER_INTERVAL_SECONDS", "86400")
-hdfs_dest_open_retry_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_OPEN_RETRY_INTERVAL_SECONDS", "60")
-hdfs_buffer_file = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_FILE", "time:yyyyMMdd-HHmm.ss.log")
-hdfs_buffer_flush_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_FLUSH_INTERVAL_SECONDS", "60")
-hdfs_buffer_rollover_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_ROLLOVER_INTERVAL_SECONDS", "600")
-hdfs_archive_max_file_count = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_ARCHIVE_MAX_FILE_COUNT", "10")
-ssl_keystore_file = default("/configurations/ranger-hdfs-plugin-properties/SSL_KEYSTORE_FILE_PATH", "/etc/hadoop/conf/ranger-plugin-keystore.jks")
-ssl_keystore_password = default("/configurations/ranger-hdfs-plugin-properties/SSL_KEYSTORE_PASSWORD", "myKeyFilePassword")
-ssl_truststore_file = default("/configurations/ranger-hdfs-plugin-properties/SSL_TRUSTSTORE_FILE_PATH", "/etc/hadoop/conf/ranger-plugin-truststore.jks")
-ssl_truststore_password = default("/configurations/ranger-hdfs-plugin-properties/SSL_TRUSTSTORE_PASSWORD", "changeit")
-
-hadoop_security_authentication = config['configurations']['core-site']['hadoop.security.authentication']
-hadoop_security_authorization = config['configurations']['core-site']['hadoop.security.authorization']
-fs_default_name = config['configurations']['core-site']['fs.defaultFS']
-hadoop_security_auth_to_local = config['configurations']['core-site']['hadoop.security.auth_to_local']
-hadoop_rpc_protection = default("/configurations/ranger-hdfs-plugin-properties/hadoop.rpc.protection", "-")
-common_name_for_certificate = default("/configurations/ranger-hdfs-plugin-properties/common.name.for.certificate", "-")
-
-repo_config_username = default("/configurations/ranger-hdfs-plugin-properties/REPOSITORY_CONFIG_USERNAME", "hadoop")
-repo_config_password = default("/configurations/ranger-hdfs-plugin-properties/REPOSITORY_CONFIG_PASSWORD", "hadoop")
-
-if security_enabled:
-  _sn_principal_name = default("/configurations/hdfs-site/dfs.secondary.namenode.kerberos.principal", "nn/_HOST@EXAMPLE.COM")
-  _sn_principal_name = _sn_principal_name.replace('_HOST',hostname.lower())
-
-admin_uname = default("/configurations/ranger-env/admin_username", "admin")
-admin_password = default("/configurations/ranger-env/admin_password", "admin")
-admin_uname_password = format("{admin_uname}:{admin_password}")
-
-ambari_ranger_admin = default("/configurations/ranger-env/ranger_admin_username", "amb_ranger_admin")
-ambari_ranger_password = default("/configurations/ranger-env/ranger_admin_password", "ambari123")
-policy_user = default("/configurations/ranger-hdfs-plugin-properties/policy_user", "ambari-qa")
-
-#For curl command in ranger plugin to get db connector
-jdk_location = config['hostLevelParams']['jdk_location']
-java_share_dir = '/usr/share/java'
-if xa_audit_db_flavor and xa_audit_db_flavor.lower() == 'mysql':
-  jdbc_symlink_name = "mysql-jdbc-driver.jar"
-  jdbc_jar_name = "mysql-connector-java.jar"
-elif xa_audit_db_flavor and xa_audit_db_flavor.lower() == 'oracle':
-  jdbc_jar_name = "ojdbc6.jar"
-  jdbc_symlink_name = "oracle-jdbc-driver.jar"
-
-downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}")
-
-driver_curl_source = format("{jdk_location}/{jdbc_symlink_name}")
-driver_curl_target = format("{java_share_dir}/{jdbc_jar_name}")
+  from params_linux import *

+ 399 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py

@@ -0,0 +1,399 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
+from ambari_commons.os_check import OSCheck
+from resource_management.libraries.functions.default import default
+from resource_management import *
+import status_params
+import utils
+import os
+import itertools
+import re
+
+config = Script.get_config()
+tmp_dir = Script.get_tmp_dir()
+
+stack_name = default("/hostLevelParams/stack_name", None)
+upgrade_direction = default("/commandParams/upgrade_direction", None)
+
+stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
+hdp_stack_version = format_hdp_stack_version(stack_version_unformatted)
+
+# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade
+version = default("/commandParams/version", None)
+
+security_enabled = config['configurations']['cluster-env']['security_enabled']
+hdfs_user = status_params.hdfs_user
+root_user = "root"
+hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix
+
+# Some datanode settings
+dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None)
+dfs_dn_http_addr = default('/configurations/hdfs-site/dfs.datanode.http.address', None)
+dfs_dn_https_addr = default('/configurations/hdfs-site/dfs.datanode.https.address', None)
+dfs_http_policy = default('/configurations/hdfs-site/dfs.http.policy', None)
+dfs_dn_ipc_address = config['configurations']['hdfs-site']['dfs.datanode.ipc.address']
+secure_dn_ports_are_in_use = False
+
+#hadoop params
+if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
+  mapreduce_libs_path = "/usr/hdp/current/hadoop-mapreduce-client/*"
+  hadoop_libexec_dir = "/usr/hdp/current/hadoop-client/libexec"
+  hadoop_bin = "/usr/hdp/current/hadoop-client/sbin"
+  hadoop_bin_dir = "/usr/hdp/current/hadoop-client/bin"
+  hadoop_home = "/usr/hdp/current/hadoop-client"
+  if not security_enabled:
+    hadoop_secure_dn_user = '""'
+  else:
+    dfs_dn_port = utils.get_port(dfs_dn_addr)
+    dfs_dn_http_port = utils.get_port(dfs_dn_http_addr)
+    dfs_dn_https_port = utils.get_port(dfs_dn_https_addr)
+    # We try to avoid inability to start datanode as a plain user due to usage of root-owned ports
+    if dfs_http_policy == "HTTPS_ONLY":
+      secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_https_port)
+    elif dfs_http_policy == "HTTP_AND_HTTPS":
+      secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) or utils.is_secure_port(dfs_dn_https_port)
+    else:   # params.dfs_http_policy == "HTTP_ONLY" or not defined:
+      secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port)
+    if secure_dn_ports_are_in_use:
+      hadoop_secure_dn_user = hdfs_user
+    else:
+      hadoop_secure_dn_user = '""'
+else:
+  mapreduce_libs_path = "/usr/lib/hadoop-mapreduce/*"
+  hadoop_libexec_dir = "/usr/lib/hadoop/libexec"
+  hadoop_bin = "/usr/lib/hadoop/sbin"
+  hadoop_bin_dir = "/usr/bin"
+  hadoop_home = "/usr/lib/hadoop"
+  hadoop_secure_dn_user = hdfs_user
+
+hadoop_conf_dir = "/etc/hadoop/conf"
+hadoop_conf_empty_dir = "/etc/hadoop/conf.empty"
+limits_conf_dir = "/etc/security/limits.d"
+
+execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir
+ulimit_cmd = "ulimit -c unlimited ; "
+
+#security params
+smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab']
+hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+falcon_user = config['configurations']['falcon-env']['falcon_user']
+
+#exclude file
+hdfs_exclude_file = default("/clusterHostInfo/decom_dn_hosts", [])
+exclude_file_path = config['configurations']['hdfs-site']['dfs.hosts.exclude']
+update_exclude_file_only = default("/commandParams/update_exclude_file_only",False)
+
+klist_path_local = functions.get_klist_path()
+kinit_path_local = functions.get_kinit_path()
+#hosts
+hostname = config["hostname"]
+rm_host = default("/clusterHostInfo/rm_host", [])
+slave_hosts = default("/clusterHostInfo/slave_hosts", [])
+oozie_servers = default("/clusterHostInfo/oozie_server", [])
+hcat_server_hosts = default("/clusterHostInfo/webhcat_server_host", [])
+hive_server_host =  default("/clusterHostInfo/hive_server_host", [])
+hbase_master_hosts = default("/clusterHostInfo/hbase_master_hosts", [])
+hs_host = default("/clusterHostInfo/hs_host", [])
+jtnode_host = default("/clusterHostInfo/jtnode_host", [])
+namenode_host = default("/clusterHostInfo/namenode_host", [])
+nm_host = default("/clusterHostInfo/nm_host", [])
+ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", [])
+journalnode_hosts = default("/clusterHostInfo/journalnode_hosts", [])
+zkfc_hosts = default("/clusterHostInfo/zkfc_hosts", [])
+falcon_host = default("/clusterHostInfo/falcon_server_hosts", [])
+
+has_ganglia_server = not len(ganglia_server_hosts) == 0
+has_namenodes = not len(namenode_host) == 0
+has_jobtracker = not len(jtnode_host) == 0
+has_resourcemanager = not len(rm_host) == 0
+has_histroryserver = not len(hs_host) == 0
+has_hbase_masters = not len(hbase_master_hosts) == 0
+has_slaves = not len(slave_hosts) == 0
+has_oozie_server = not len(oozie_servers)  == 0
+has_hcat_server_host = not len(hcat_server_hosts)  == 0
+has_hive_server_host = not len(hive_server_host)  == 0
+has_journalnode_hosts = not len(journalnode_hosts)  == 0
+has_zkfc_hosts = not len(zkfc_hosts)  == 0
+has_falcon_host = not len(falcon_host)  == 0
+
+
+is_namenode_master = hostname in namenode_host
+is_jtnode_master = hostname in jtnode_host
+is_rmnode_master = hostname in rm_host
+is_hsnode_master = hostname in hs_host
+is_hbase_master = hostname in hbase_master_hosts
+is_slave = hostname in slave_hosts
+
+if has_ganglia_server:
+  ganglia_server_host = ganglia_server_hosts[0]
+
+#users and groups
+yarn_user = config['configurations']['yarn-env']['yarn_user']
+hbase_user = config['configurations']['hbase-env']['hbase_user']
+oozie_user = config['configurations']['oozie-env']['oozie_user']
+webhcat_user = config['configurations']['hive-env']['hcat_user']
+hcat_user = config['configurations']['hive-env']['hcat_user']
+hive_user = config['configurations']['hive-env']['hive_user']
+smoke_user =  config['configurations']['cluster-env']['smokeuser']
+smokeuser_principal =  config['configurations']['cluster-env']['smokeuser_principal_name']
+mapred_user = config['configurations']['mapred-env']['mapred_user']
+hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
+
+user_group = config['configurations']['cluster-env']['user_group']
+root_group = "root"
+proxyuser_group =  config['configurations']['hadoop-env']['proxyuser_group']
+
+#hadoop params
+hdfs_log_dir_prefix = config['configurations']['hadoop-env']['hdfs_log_dir_prefix']
+hadoop_root_logger = config['configurations']['hadoop-env']['hadoop_root_logger']
+
+dfs_domain_socket_path = config['configurations']['hdfs-site']['dfs.domain.socket.path']
+dfs_domain_socket_dir = os.path.dirname(dfs_domain_socket_path)
+
+jn_edits_dir = config['configurations']['hdfs-site']['dfs.journalnode.edits.dir']
+
+dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir']
+
+namenode_dirs_created_stub_dir = format("{hdfs_log_dir_prefix}/{hdfs_user}")
+namenode_dirs_stub_filename = "namenode_dirs_created"
+
+smoke_hdfs_user_dir = format("/user/{smoke_user}")
+smoke_hdfs_user_mode = 0770
+
+
+hdfs_namenode_formatted_mark_suffix = "/namenode-formatted/"
+namenode_formatted_old_mark_dirs = ["/var/run/hadoop/hdfs/namenode-formatted", 
+  format("{hadoop_pid_dir_prefix}/hdfs/namenode/formatted"),
+  "/var/lib/hdfs/namenode/formatted"]
+dfs_name_dirs = dfs_name_dir.split(",")
+namenode_formatted_mark_dirs = []
+for dn_dir in dfs_name_dirs:
+ tmp_mark_dir = format("{dn_dir}{hdfs_namenode_formatted_mark_suffix}")
+ namenode_formatted_mark_dirs.append(tmp_mark_dir)
+
+# Use the namenode RPC address if configured, otherwise, fallback to the default file system
+namenode_address = None
+if 'dfs.namenode.rpc-address' in config['configurations']['hdfs-site']:
+  namenode_rpcaddress = config['configurations']['hdfs-site']['dfs.namenode.rpc-address']
+  namenode_address = format("hdfs://{namenode_rpcaddress}")
+else:
+  namenode_address = config['configurations']['core-site']['fs.defaultFS']
+
+fs_checkpoint_dirs = config['configurations']['hdfs-site']['dfs.namenode.checkpoint.dir'].split(',')
+
+dfs_data_dir = config['configurations']['hdfs-site']['dfs.datanode.data.dir']
+dfs_data_dir = ",".join([re.sub(r'^\[.+\]', '', dfs_dir.strip()) for dfs_dir in dfs_data_dir.split(",")])
+
+data_dir_mount_file = config['configurations']['hadoop-env']['dfs.datanode.data.dir.mount.file']
+
+# HDFS High Availability properties
+dfs_ha_enabled = False
+dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None)
+dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None)
+dfs_ha_automatic_failover_enabled = default("/configurations/hdfs-site/dfs.ha.automatic-failover.enabled", False)
+
+# hostname of the active HDFS HA Namenode (only used when HA is enabled)
+dfs_ha_namenode_active = default("/configurations/hadoop-env/dfs_ha_initial_namenode_active", None)
+# hostname of the standby HDFS HA Namenode (only used when HA is enabled)
+dfs_ha_namenode_standby = default("/configurations/hadoop-env/dfs_ha_initial_namenode_standby", None)
+
+namenode_id = None
+namenode_rpc = None
+
+if dfs_ha_namenode_ids:
+  dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
+  dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
+  if dfs_ha_namenode_ids_array_len > 1:
+    dfs_ha_enabled = True
+if dfs_ha_enabled:
+  for nn_id in dfs_ha_namemodes_ids_list:
+    nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')]
+    if hostname in nn_host:
+      namenode_id = nn_id
+      namenode_rpc = nn_host
+  # With HA enabled namenode_address is recomputed
+  namenode_address = format('hdfs://{dfs_ha_nameservices}')
+
+if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY":
+  https_only = True
+  journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.https-address', None)
+else:
+  https_only = False
+  journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.http-address', None)
+
+if journalnode_address:
+  journalnode_port = journalnode_address.split(":")[1]
+  
+  
+if security_enabled:
+  _dn_principal_name = config['configurations']['hdfs-site']['dfs.datanode.kerberos.principal']
+  _dn_keytab = config['configurations']['hdfs-site']['dfs.datanode.keytab.file']
+  _dn_principal_name = _dn_principal_name.replace('_HOST',hostname.lower())
+  
+  dn_kinit_cmd = format("{kinit_path_local} -kt {_dn_keytab} {_dn_principal_name};")
+  
+  _nn_principal_name = config['configurations']['hdfs-site']['dfs.namenode.kerberos.principal']
+  _nn_keytab = config['configurations']['hdfs-site']['dfs.namenode.keytab.file']
+  _nn_principal_name = _nn_principal_name.replace('_HOST',hostname.lower())
+  
+  nn_kinit_cmd = format("{kinit_path_local} -kt {_nn_keytab} {_nn_principal_name};")
+
+  _jn_principal_name = default("/configurations/hdfs-site/dfs.journalnode.kerberos.principal", None)
+  if _jn_principal_name:
+    _jn_principal_name = _jn_principal_name.replace('_HOST', hostname.lower())
+  _jn_keytab = default("/configurations/hdfs-site/dfs.journalnode.keytab.file", None)
+  jn_kinit_cmd = format("{kinit_path_local} -kt {_jn_keytab} {_jn_principal_name};")
+else:
+  dn_kinit_cmd = ""
+  nn_kinit_cmd = ""
+  jn_kinit_cmd = ""
+
+import functools
+#create partial functions with common arguments for every HdfsDirectory call
+#to create hdfs directory we need to call params.HdfsDirectory in code
+HdfsDirectory = functools.partial(
+  HdfsDirectory,
+  conf_dir=hadoop_conf_dir,
+  hdfs_user=hdfs_user,
+  security_enabled = security_enabled,
+  keytab = hdfs_user_keytab,
+  kinit_path_local = kinit_path_local,
+  bin_dir = hadoop_bin_dir
+)
+
+# The logic for LZO also exists in OOZIE's params.py
+io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None)
+lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower()
+lzo_packages = get_lzo_packages(stack_version_unformatted)
+
+exclude_packages = []
+if not lzo_enabled:
+  exclude_packages += lzo_packages
+  
+name_node_params = default("/commandParams/namenode", None)
+
+#hadoop params
+hadoop_env_sh_template = config['configurations']['hadoop-env']['content']
+
+#hadoop-env.sh
+java_home = config['hostLevelParams']['java_home']
+java_version = int(config['hostLevelParams']['java_version'])
+
+if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.0') >= 0 and compare_versions(hdp_stack_version, '2.1') < 0 and not OSCheck.is_suse_family():
+  # deprecated rhel jsvc_path
+  jsvc_path = "/usr/libexec/bigtop-utils"
+else:
+  jsvc_path = "/usr/lib/bigtop-utils"
+
+hadoop_heapsize = config['configurations']['hadoop-env']['hadoop_heapsize']
+namenode_heapsize = config['configurations']['hadoop-env']['namenode_heapsize']
+namenode_opt_newsize = config['configurations']['hadoop-env']['namenode_opt_newsize']
+namenode_opt_maxnewsize = config['configurations']['hadoop-env']['namenode_opt_maxnewsize']
+namenode_opt_permsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_permsize","128m")
+namenode_opt_maxpermsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_maxpermsize","256m")
+
+jtnode_opt_newsize = "200m"
+jtnode_opt_maxnewsize = "200m"
+jtnode_heapsize =  "1024m"
+ttnode_heapsize = "1024m"
+
+dtnode_heapsize = config['configurations']['hadoop-env']['dtnode_heapsize']
+mapred_pid_dir_prefix = default("/configurations/mapred-env/mapred_pid_dir_prefix","/var/run/hadoop-mapreduce")
+mapred_log_dir_prefix = default("/configurations/mapred-env/mapred_log_dir_prefix","/var/log/hadoop-mapreduce")
+
+# ranger host
+ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
+has_ranger_admin = not len(ranger_admin_hosts) == 0
+
+if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
+  # setting flag value for ranger hdfs plugin
+  enable_ranger_hdfs = False
+  ranger_plugin_enable = default("/configurations/ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled", "no")
+  if ranger_plugin_enable.lower() == 'yes':
+    enable_ranger_hdfs = True
+  elif ranger_plugin_enable.lower() == 'no':
+    enable_ranger_hdfs = False
+
+ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
+
+#ranger hdfs properties
+policymgr_mgr_url = default("/configurations/admin-properties/policymgr_external_url", "http://localhost:6080")
+sql_connector_jar = default("/configurations/admin-properties/SQL_CONNECTOR_JAR", "/usr/share/java/mysql-connector-java.jar")
+xa_audit_db_flavor = default("/configurations/admin-properties/DB_FLAVOR", "MYSQL")
+xa_audit_db_name = default("/configurations/admin-properties/audit_db_name", "ranger_audit")
+xa_audit_db_user = default("/configurations/admin-properties/audit_db_user", "rangerlogger")
+xa_audit_db_password = default("/configurations/admin-properties/audit_db_password", "rangerlogger")
+xa_db_host = default("/configurations/admin-properties/db_host", "localhost")
+repo_name = str(config['clusterName']) + '_hadoop'
+db_enabled = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.DB.IS_ENABLED", "false")
+hdfs_enabled = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.IS_ENABLED", "false")
+hdfs_dest_dir = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINATION_DIRECTORY", "hdfs://__REPLACE__NAME_NODE_HOST:8020/ranger/audit/app-type/time:yyyyMMdd")
+hdfs_buffer_dir = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY", "__REPLACE__LOG_DIR/hadoop/app-type/audit")
+hdfs_archive_dir = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY", "__REPLACE__LOG_DIR/hadoop/app-type/audit/archive")
+hdfs_dest_file = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_FILE", "hostname-audit.log")
+hdfs_dest_flush_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_FLUSH_INTERVAL_SECONDS", "900")
+hdfs_dest_rollover_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_ROLLOVER_INTERVAL_SECONDS", "86400")
+hdfs_dest_open_retry_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.DESTINTATION_OPEN_RETRY_INTERVAL_SECONDS", "60")
+hdfs_buffer_file = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_FILE", "time:yyyyMMdd-HHmm.ss.log")
+hdfs_buffer_flush_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_FLUSH_INTERVAL_SECONDS", "60")
+hdfs_buffer_rollover_int_sec = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_BUFFER_ROLLOVER_INTERVAL_SECONDS", "600")
+hdfs_archive_max_file_count = default("/configurations/ranger-hdfs-plugin-properties/XAAUDIT.HDFS.LOCAL_ARCHIVE_MAX_FILE_COUNT", "10")
+ssl_keystore_file = default("/configurations/ranger-hdfs-plugin-properties/SSL_KEYSTORE_FILE_PATH", "/etc/hadoop/conf/ranger-plugin-keystore.jks")
+ssl_keystore_password = default("/configurations/ranger-hdfs-plugin-properties/SSL_KEYSTORE_PASSWORD", "myKeyFilePassword")
+ssl_truststore_file = default("/configurations/ranger-hdfs-plugin-properties/SSL_TRUSTSTORE_FILE_PATH", "/etc/hadoop/conf/ranger-plugin-truststore.jks")
+ssl_truststore_password = default("/configurations/ranger-hdfs-plugin-properties/SSL_TRUSTSTORE_PASSWORD", "changeit")
+
+hadoop_security_authentication = config['configurations']['core-site']['hadoop.security.authentication']
+hadoop_security_authorization = config['configurations']['core-site']['hadoop.security.authorization']
+fs_default_name = config['configurations']['core-site']['fs.defaultFS']
+hadoop_security_auth_to_local = config['configurations']['core-site']['hadoop.security.auth_to_local']
+hadoop_rpc_protection = default("/configurations/ranger-hdfs-plugin-properties/hadoop.rpc.protection", "-")
+common_name_for_certificate = default("/configurations/ranger-hdfs-plugin-properties/common.name.for.certificate", "-")
+
+repo_config_username = default("/configurations/ranger-hdfs-plugin-properties/REPOSITORY_CONFIG_USERNAME", "hadoop")
+repo_config_password = default("/configurations/ranger-hdfs-plugin-properties/REPOSITORY_CONFIG_PASSWORD", "hadoop")
+
+if security_enabled:
+  _sn_principal_name = default("/configurations/hdfs-site/dfs.secondary.namenode.kerberos.principal", "nn/_HOST@EXAMPLE.COM")
+  _sn_principal_name = _sn_principal_name.replace('_HOST',hostname.lower())
+
+admin_uname = default("/configurations/ranger-env/admin_username", "admin")
+admin_password = default("/configurations/ranger-env/admin_password", "admin")
+admin_uname_password = format("{admin_uname}:{admin_password}")
+
+ambari_ranger_admin = default("/configurations/ranger-env/ranger_admin_username", "amb_ranger_admin")
+ambari_ranger_password = default("/configurations/ranger-env/ranger_admin_password", "ambari123")
+policy_user = default("/configurations/ranger-hdfs-plugin-properties/policy_user", "ambari-qa")
+
+#For curl command in ranger plugin to get db connector
+jdk_location = config['hostLevelParams']['jdk_location']
+java_share_dir = '/usr/share/java'
+if xa_audit_db_flavor and xa_audit_db_flavor.lower() == 'mysql':
+  jdbc_symlink_name = "mysql-jdbc-driver.jar"
+  jdbc_jar_name = "mysql-connector-java.jar"
+elif xa_audit_db_flavor and xa_audit_db_flavor.lower() == 'oracle':
+  jdbc_jar_name = "ojdbc6.jar"
+  jdbc_symlink_name = "oracle-jdbc-driver.jar"
+
+downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}")
+
+driver_curl_source = format("{jdk_location}/{jdbc_symlink_name}")
+driver_curl_target = format("{java_share_dir}/{jdbc_jar_name}")

+ 2 - 0
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/params.py → ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_windows.py

@@ -19,6 +19,7 @@ limitations under the License.
 
 from resource_management import *
 import os
+from status_params import *
 
 config = Script.get_config()
 hadoop_conf_dir = os.environ["HADOOP_CONF_DIR"]
@@ -56,3 +57,4 @@ hdfs_user = "hadoop"
 grep_exe = "findstr"
 
 name_node_params = default("/commandParams/namenode", None)
+exclude_packages = []

+ 37 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/service_check.py

@@ -18,9 +18,15 @@ limitations under the License.
 """
 
 from resource_management import *
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 
 class HdfsServiceCheck(Script):
+  pass
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class HdfsServiceCheckDefault(HdfsServiceCheck):
   def service_check(self, env):
     import params
 
@@ -114,6 +120,37 @@ class HdfsServiceCheck(Script):
                 tries=5
         )
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class HdfsServiceCheckWindows(HdfsServiceCheck):
+  def service_check(self, env):
+    import params
+    env.set_params(params)
+
+    unique = functions.get_unique_id_and_date()
+
+    #Hadoop uses POSIX-style paths, separator is always /
+    dir = '/tmp'
+    tmp_file = dir + '/' + unique
+
+    #commands for execution
+    hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
+    create_dir_cmd = "%s fs -mkdir %s" % (hadoop_cmd, dir)
+    own_dir = "%s fs -chmod 777 %s" % (hadoop_cmd, dir)
+    test_dir_exists = "%s fs -test -e %s" % (hadoop_cmd, dir)
+    cleanup_cmd = "%s fs -rm %s" % (hadoop_cmd, tmp_file)
+    create_file_cmd = "%s fs -put %s %s" % (hadoop_cmd, os.path.join(params.hadoop_conf_dir, "core-site.xml"), tmp_file)
+    test_cmd = "%s fs -test -e %s" % (hadoop_cmd, tmp_file)
+
+    hdfs_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hdfs.cmd"))
+    safemode_command = "%s dfsadmin -safemode get | %s OFF" % (hdfs_cmd, params.grep_exe)
+
+    Execute(safemode_command, logoutput=True, try_sleep=3, tries=20)
+    Execute(create_dir_cmd, user=params.hdfs_user,logoutput=True, ignore_failures=True)
+    Execute(own_dir, user=params.hdfs_user,logoutput=True)
+    Execute(test_dir_exists, user=params.hdfs_user,logoutput=True)
+    Execute(create_file_cmd, user=params.hdfs_user,logoutput=True)
+    Execute(test_cmd, user=params.hdfs_user,logoutput=True)
+    Execute(cleanup_cmd, user=params.hdfs_user,logoutput=True)
 
 if __name__ == "__main__":
   HdfsServiceCheck().execute()

+ 21 - 22
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py

@@ -23,51 +23,47 @@ from resource_management.libraries.functions.security_commons import build_expec
   FILE_TYPE_XML
 from hdfs_snamenode import snamenode
 from hdfs import hdfs
-
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 class SNameNode(Script):
-
-  def get_stack_to_component(self):
-    return {"HDP": "hadoop-hdfs-secondarynamenode"}
-
   def install(self, env):
     import params
-
     env.set_params(params)
-
     self.install_packages(env, params.exclude_packages)
 
-  def pre_rolling_restart(self, env):
-    # Secondary namenode is actually removed in an HA cluster, which is a pre-requisite for Rolling Upgrade,
-    # so it does not need any Rolling Restart logic.
-    pass
+  def configure(self, env):
+    import params
+    env.set_params(params)
+    hdfs("secondarynamenode")
+    snamenode(action="configure")
 
   def start(self, env, rolling_restart=False):
     import params
     env.set_params(params)
-
     self.configure(env)
     snamenode(action="start")
 
   def stop(self, env, rolling_restart=False):
     import params
     env.set_params(params)
-
     snamenode(action="stop")
 
-  def configure(self, env):
-    import params
-
-    env.set_params(params)
-    hdfs()
-    snamenode(action="configure")
-
   def status(self, env):
     import status_params
-
     env.set_params(status_params)
+    snamenode(action="status")
 
-    check_process_status(status_params.snamenode_pid_file)
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class SNameNodeDefault(SNameNode):
+
+  def get_stack_to_component(self):
+    return {"HDP": "hadoop-hdfs-secondarynamenode"}
+
+  def pre_rolling_restart(self, env):
+    # Secondary namenode is actually removed in an HA cluster, which is a pre-requisite for Rolling Upgrade,
+    # so it does not need any Rolling Restart logic.
+    pass
 
   def security_status(self, env):
     import status_params
@@ -129,6 +125,9 @@ class SNameNode(Script):
     else:
       self.put_structured_out({"securityState": "UNSECURED"})
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class SNameNodeWindows(SNameNode):
+  pass
 
 if __name__ == "__main__":
   SNameNode().execute()

+ 24 - 17
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py

@@ -18,24 +18,31 @@ limitations under the License.
 """
 
 from resource_management import *
+from ambari_commons import OSCheck
 
 config = Script.get_config()
 
-hadoop_pid_dir_prefix = config['configurations']['hadoop-env']['hadoop_pid_dir_prefix']
-hdfs_user = config['configurations']['hadoop-env']['hdfs_user']
-hdp_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
-datanode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-datanode.pid")
-namenode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-namenode.pid")
-snamenode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-secondarynamenode.pid")
-journalnode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-journalnode.pid")
-zkfc_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-zkfc.pid")
-nfsgateway_pid_file = format("{hadoop_pid_dir_prefix}/root/hadoop_privileged_nfs3.pid")
+if OSCheck.is_windows_family():
+  namenode_win_service_name = "namenode"
+  datanode_win_service_name = "datanode"
+  snamenode_win_service_name = "secondarynamenode"
+  journalnode_win_service_name = "journalnode"
+else:
+  hadoop_pid_dir_prefix = config['configurations']['hadoop-env']['hadoop_pid_dir_prefix']
+  hdfs_user = config['configurations']['hadoop-env']['hdfs_user']
+  hdp_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
+  datanode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-datanode.pid")
+  namenode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-namenode.pid")
+  snamenode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-secondarynamenode.pid")
+  journalnode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-journalnode.pid")
+  zkfc_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-zkfc.pid")
+  nfsgateway_pid_file = format("{hadoop_pid_dir_prefix}/root/hadoop_privileged_nfs3.pid")
 
-# Security related/required params
-hostname = config['hostname']
-security_enabled = config['configurations']['cluster-env']['security_enabled']
-hdfs_user_principal = config['configurations']['hadoop-env']['hdfs_principal_name']
-hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
-hadoop_conf_dir = "/etc/hadoop/conf"
-kinit_path_local = functions.get_kinit_path()
-tmp_dir = Script.get_tmp_dir()
+  # Security related/required params
+  hostname = config['hostname']
+  security_enabled = config['configurations']['cluster-env']['security_enabled']
+  hdfs_user_principal = config['configurations']['hadoop-env']['hdfs_principal_name']
+  hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+  hadoop_conf_dir = "/etc/hadoop/conf"
+  kinit_path_local = functions.get_kinit_path()
+  tmp_dir = Script.get_tmp_dir()

+ 27 - 9
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py

@@ -24,14 +24,23 @@ from resource_management.libraries.functions.security_commons import build_expec
   FILE_TYPE_XML
 import utils  # this is needed to avoid a circular dependency since utils.py calls this class
 from hdfs import hdfs
-
+from ambari_commons.os_family_impl import OsFamilyImpl
+from ambari_commons import OSConst
 
 class ZkfcSlave(Script):
   def install(self, env):
     import params
-
+    env.set_params(params)
     self.install_packages(env, params.exclude_packages)
+
+  def configure(self, env):
+    import params
     env.set_params(params)
+    hdfs()
+    pass
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class ZkfcSlaveDefault(ZkfcSlave):
 
   def start(self, env, rolling_restart=False):
     import params
@@ -68,22 +77,15 @@ class ZkfcSlave(Script):
       create_log_dir=True
     )
 
-  def configure(self, env):
-    hdfs()
-    pass
 
   def status(self, env):
     import status_params
-
     env.set_params(status_params)
-
     check_process_status(status_params.zkfc_pid_file)
 
   def security_status(self, env):
     import status_params
-
     env.set_params(status_params)
-
     props_value_check = {"hadoop.security.authentication": "kerberos",
                          "hadoop.security.authorization": "true"}
     props_empty_check = ["hadoop.security.auth_to_local"]
@@ -144,5 +146,21 @@ def initialize_ha_zookeeper(params):
     Logger.error('HA state initialization in ZooKeeper threw an exception. Reason %s' %(str(ex)))
   return False
 
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class ZkfcSlaveWindows(ZkfcSlave):
+  def start(self, env):
+    import params
+    self.configure(env)
+    Service(params.zkfc_win_service_name, action="start")
+
+  def stop(self, env):
+    import params
+    Service(params.zkfc_win_service_name, action="stop")
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_windows_service_status(status_params.zkfc_win_service_name)
+
 if __name__ == "__main__":
   ZkfcSlave().execute()

+ 0 - 49
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/datanode.py

@@ -1,49 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from hdfs import hdfs
-import service_mapping
-
-class DataNode(Script):
-  def install(self, env):
-
-    if not check_windows_service_exists(service_mapping.datanode_win_service_name):
-      self.install_packages(env)
-
-  def start(self, env):
-    import params
-    self.configure(env)
-    Service(service_mapping.datanode_win_service_name, action="start")
-
-  def stop(self, env):
-    import params
-    env.set_params(params)
-    Service(service_mapping.datanode_win_service_name, action="stop")
-
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs("datanode")
-
-  def status(self, env):
-    check_windows_service_status(service_mapping.datanode_win_service_name)
-
-if __name__ == "__main__":
-  DataNode().execute()

+ 0 - 54
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs.py

@@ -1,54 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Ambari Agent
-
-"""
-
-from resource_management import *
-import os
-
-def hdfs(component=None):
-  import params
-  if component == "namenode":
-    directories = params.dfs_name_dir.split(",")
-    Directory(directories,
-              owner=params.hdfs_user,
-              mode="(OI)(CI)F",
-              recursive=True
-    )
-    File(params.exclude_file_path,
-         content=Template("exclude_hosts_list.j2"),
-         owner=params.hdfs_user,
-         mode="f",
-    )
-  if "hadoop-policy" in params.config['configurations']:
-    XmlConfig("hadoop-policy.xml",
-              conf_dir=params.hadoop_conf_dir,
-              configurations=params.config['configurations']['hadoop-policy'],
-              owner=params.hdfs_user,
-              mode="f",
-              configuration_attributes=params.config['configuration_attributes']['hadoop-policy']
-    )
-
-  XmlConfig("hdfs-site.xml",
-            conf_dir=params.hadoop_conf_dir,
-            configurations=params.config['configurations']['hdfs-site'],
-            owner=params.hdfs_user,
-            mode="f",
-            configuration_attributes=params.config['configuration_attributes']['hdfs-site']
-  )

+ 0 - 41
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_client.py

@@ -1,41 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from hdfs import hdfs
-
-
-class HdfsClient(Script):
-  def install(self, env):
-    # client checks env var to determine if it is installed
-    if not os.environ.has_key("HADOOP_CONF_DIR"):
-      self.install_packages(env)
-    self.configure(env)
-
-  def status(self, env):
-    raise ClientComponentHasNoStatus()
-
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs()
-
-
-if __name__ == "__main__":
-  HdfsClient().execute()

+ 0 - 130
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/hdfs_rebalance.py

@@ -1,130 +0,0 @@
-#!/usr/bin/env python
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-import re
-
-class HdfsParser():
-  def __init__(self):
-    self.initialLine = None
-    self.state = None
-
-  def parseLine(self, line):
-    hdfsLine = HdfsLine()
-    type, matcher = hdfsLine.recognizeType(line)
-    if(type == HdfsLine.LineType.HeaderStart):
-      self.state = 'PROCESS_STARTED'
-    elif (type == HdfsLine.LineType.Progress):
-      self.state = 'PROGRESS'
-      hdfsLine.parseProgressLog(line, matcher)
-      if(self.initialLine == None): self.initialLine = hdfsLine
-
-      return hdfsLine
-    elif (type == HdfsLine.LineType.ProgressEnd):
-      self.state = 'PROCESS_FINISED'
-    return None
-
-class HdfsLine():
-
-  class LineType:
-    HeaderStart, Progress, ProgressEnd, Unknown = range(4)
-
-
-  MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB']
-  MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))'
-
-  HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved')
-  PROGRESS_PATTERN = re.compile(
-                            "(?P<date>.*?)\s+" +
-                            "(?P<iteration>\d+)\s+" +
-                            MEMORY_PATTERN % (1,1,1) + "\s+" +
-                            MEMORY_PATTERN % (2,2,2) + "\s+" +
-                            MEMORY_PATTERN % (3,3,3)
-                            )
-  PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)')
-
-  def __init__(self):
-    self.date = None
-    self.iteration = None
-    self.bytesAlreadyMoved = None
-    self.bytesLeftToMove = None
-    self.bytesBeingMoved = None
-    self.bytesAlreadyMovedStr = None
-    self.bytesLeftToMoveStr = None
-    self.bytesBeingMovedStr = None
-
-  def recognizeType(self, line):
-    for (type, pattern) in (
-                            (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN),
-                            (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN),
-                            (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN)
-                            ):
-      m = re.match(pattern, line)
-      if m:
-        return type, m
-    return HdfsLine.LineType.Unknown, None
-
-  def parseProgressLog(self, line, m):
-    '''
-    Parse the line of 'hdfs rebalancer' output. The example output being parsed:
-
-    Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
-    Jul 28, 2014 5:01:49 PM           0                  0 B             5.74 GB            9.79 GB
-    Jul 28, 2014 5:03:00 PM           1                  0 B             5.58 GB            9.79 GB
-
-    Throws AmbariException in case of parsing errors
-
-    '''
-    m = re.match(self.PROGRESS_PATTERN, line)
-    if m:
-      self.date = m.group('date')
-      self.iteration = int(m.group('iteration'))
-
-      self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), m.group('mult_1'))
-      self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), m.group('mult_2'))
-      self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), m.group('mult_3'))
-
-      self.bytesAlreadyMovedStr = m.group('memmult_1')
-      self.bytesLeftToMoveStr = m.group('memmult_2')
-      self.bytesBeingMovedStr = m.group('memmult_3')
-    else:
-      raise AmbariException("Failed to parse line [%s]")
-
-  def parseMemory(self, memorySize, multiplier_type):
-    try:
-      factor = self.MEMORY_SUFFIX.index(multiplier_type)
-    except ValueError:
-      raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type))
-
-    return float(memorySize) * (1024 ** factor)
-  def toJson(self):
-    return {
-            'timeStamp' : self.date,
-            'iteration' : self.iteration,
-
-            'dataMoved': self.bytesAlreadyMovedStr,
-            'dataLeft' : self.bytesLeftToMoveStr,
-            'dataBeingMoved': self.bytesBeingMovedStr,
-
-            'bytesMoved': self.bytesAlreadyMoved,
-            'bytesLeft' : self.bytesLeftToMove,
-            'bytesBeingMoved': self.bytesBeingMoved,
-          }
-  def __str__(self):
-    return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved)

+ 0 - 48
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/journalnode.py

@@ -1,48 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from hdfs import hdfs
-import service_mapping
-
-class JournalNode(Script):
-  def install(self, env):
-    if not check_windows_service_exists(service_mapping.journalnode_win_service_name):
-      self.install_packages(env)
-
-  def start(self, env):
-    import params
-    self.configure(env)
-    Service(service_mapping.journalnode_win_service_name, action="start")
-
-  def stop(self, env):
-    import params
-    Service(service_mapping.journalnode_win_service_name, action="stop")
-
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs()
-    pass
-
-  def status(self, env):
-    check_windows_service_status(service_mapping.journalnode_win_service_name)
-
-if __name__ == "__main__":
-  JournalNode().execute()

+ 0 - 128
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/namenode.py

@@ -1,128 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from hdfs import hdfs
-import service_mapping
-import hdfs_rebalance
-import time
-import json
-import subprocess
-import sys
-import os
-from datetime import datetime
-from ambari_commons.os_windows import *
-
-class NameNode(Script):
-  def install(self, env):
-    if not check_windows_service_exists(service_mapping.namenode_win_service_name):
-      self.install_packages(env)
-
-    import params
-    self.configure(env)
-    namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED")
-    if not os.path.exists(namenode_format_marker):
-      hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
-      Execute("%s namenode -format" % (hadoop_cmd))
-      open(namenode_format_marker, 'a').close()
-
-  def start(self, env):
-    self.configure(env)
-    Service(service_mapping.namenode_win_service_name, action="start")
-
-  def stop(self, env):
-    Service(service_mapping.namenode_win_service_name, action="stop")
-
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs("namenode")
-
-  def status(self, env):
-    check_windows_service_status(service_mapping.namenode_win_service_name)
-    pass
-
-  def decommission(self, env):
-    import params
-
-    env.set_params(params)
-    hdfs_user = params.hdfs_user
-    conf_dir = params.hadoop_conf_dir
-
-    File(params.exclude_file_path,
-         content=Template("exclude_hosts_list.j2"),
-         owner=hdfs_user
-    )
-
-    if params.dfs_ha_enabled:
-      # due to a bug in hdfs, refreshNodes will not run on both namenodes so we
-      # need to execute each command scoped to a particular namenode
-      nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes')
-    else:
-      nn_refresh_cmd = format('cmd /c hadoop dfsadmin -refreshNodes')
-    Execute(nn_refresh_cmd, user=hdfs_user)
-
-
-  def rebalancehdfs(self, env):
-    import params
-    env.set_params(params)
-
-    hdfs_user = params.hdfs_user
-
-    name_node_parameters = json.loads( params.name_node_params )
-    threshold = name_node_parameters['threshold']
-    _print("Starting balancer with threshold = %s\n" % threshold)
-
-    def calculateCompletePercent(first, current):
-      return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
-
-    def startRebalancingProcess(threshold):
-      rebalanceCommand = 'hdfs balancer -threshold %s' % threshold
-      return ['cmd', '/C', rebalanceCommand]
-
-    command = startRebalancingProcess(threshold)
-    basedir = os.path.join(env.config.basedir, 'scripts')
-
-    _print("Executing command %s\n" % command)
-
-    parser = hdfs_rebalance.HdfsParser()
-    returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_user, Script.get_password(hdfs_user))
-
-    for line in stdout.split('\n'):
-      _print('[balancer] %s %s' % (str(datetime.now()), line ))
-      pl = parser.parseLine(line)
-      if pl:
-        res = pl.toJson()
-        res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
-
-        self.put_structured_out(res)
-      elif parser.state == 'PROCESS_FINISED' :
-        _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
-        self.put_structured_out({'completePercent' : 1})
-        break
-
-    if returncode != None and returncode != 0:
-      raise Fail('Hdfs rebalance process exited with error. See the log output')
-
-def _print(line):
-  sys.stdout.write(line)
-  sys.stdout.flush()
-
-if __name__ == "__main__":
-  NameNode().execute()

+ 0 - 55
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/service_check.py

@@ -1,55 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from resource_management.libraries import functions
-
-class HdfsServiceCheck(Script):
-  def service_check(self, env):
-    import params
-    env.set_params(params)
-
-    unique = functions.get_unique_id_and_date()
-
-    #Hadoop uses POSIX-style paths, separator is always /
-    dir = '/tmp'
-    tmp_file = dir + '/' + unique
-
-    #commands for execution
-    hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
-    create_dir_cmd = "%s fs -mkdir %s" % (hadoop_cmd, dir)
-    own_dir = "%s fs -chmod 777 %s" % (hadoop_cmd, dir)
-    test_dir_exists = "%s fs -test -e %s" % (hadoop_cmd, dir)
-    cleanup_cmd = "%s fs -rm %s" % (hadoop_cmd, tmp_file)
-    create_file_cmd = "%s fs -put %s %s" % (hadoop_cmd, os.path.join(params.hadoop_conf_dir, "core-site.xml"), tmp_file)
-    test_cmd = "%s fs -test -e %s" % (hadoop_cmd, tmp_file)
-
-    hdfs_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hdfs.cmd"))
-    safemode_command = "%s dfsadmin -safemode get | %s OFF" % (hdfs_cmd, params.grep_exe)
-
-    Execute(safemode_command, logoutput=True, try_sleep=3, tries=20)
-    Execute(create_dir_cmd, user=params.hdfs_user,logoutput=True, ignore_failures=True)
-    Execute(own_dir, user=params.hdfs_user,logoutput=True)
-    Execute(test_dir_exists, user=params.hdfs_user,logoutput=True)
-    Execute(create_file_cmd, user=params.hdfs_user,logoutput=True)
-    Execute(test_cmd, user=params.hdfs_user,logoutput=True)
-    Execute(cleanup_cmd, user=params.hdfs_user,logoutput=True)
-
-if __name__ == "__main__":
-  HdfsServiceCheck().execute()

+ 0 - 24
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/service_mapping.py

@@ -1,24 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-namenode_win_service_name = "namenode"
-datanode_win_service_name = "datanode"
-snamenode_win_service_name = "secondarynamenode"
-journalnode_win_service_name = "journalnode"
-zkfc_win_service_name = "zkfc"

+ 0 - 48
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/snamenode.py

@@ -1,48 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from hdfs import hdfs
-import service_mapping
-
-class SNameNode(Script):
-  def install(self, env):
-    if not check_windows_service_exists(service_mapping.snamenode_win_service_name):
-      self.install_packages(env)
-
-  def start(self, env):
-    import params
-    self.configure(env)
-    Service(service_mapping.snamenode_win_service_name, action="start")
-
-  def stop(self, env):
-    import params
-    Service(service_mapping.snamenode_win_service_name, action="stop")
-
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs("secondarynamenode")
-
-  def status(self, env):
-    import params
-    check_windows_service_status(service_mapping.snamenode_win_service_name)
-
-if __name__ == "__main__":
-  SNameNode().execute()

+ 0 - 51
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/scripts/zkfc_slave.py

@@ -1,51 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-"""
-
-from resource_management import *
-from hdfs import hdfs
-import service_mapping
-
-class ZkfcSlave(Script):
-  def install(self, env):
-    if not check_windows_service_exists(service_mapping.zkfc_win_service_name):
-      import params
-      env.set_params(params)
-      self.install_packages(env)
-
-  def start(self, env):
-    import params
-    self.configure(env)
-    Service(service_mapping.zkfc_win_service_name, action="start")
-
-  def stop(self, env):
-    import params
-    Service(service_mapping.zkfc_win_service_name, action="stop")
-
-  def configure(self, env):
-    import params
-    env.set_params(params)
-    hdfs()
-    pass
-
-  def status(self, env):
-    check_windows_service_status(service_mapping.zkfc_win_service_name)
-
-
-if __name__ == "__main__":
-  ZkfcSlave().execute()

+ 0 - 21
ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/HDFS/package/templates/exclude_hosts_list.j2

@@ -1,21 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-{% for host in hdfs_exclude_file %}
-{{host}}
-{% endfor %}

+ 1 - 0
ambari-server/src/test/python/stacks/2.0.6/HDFS/test_hdfs_client.py

@@ -25,6 +25,7 @@ from resource_management import *
 from stacks.utils.RMFTestCase import *
 
 
+@patch("platform.linux_distribution", new = MagicMock(return_value="Linux"))
 @patch.object(tarfile,"open", new = MagicMock())
 @patch.object(tempfile,"mkdtemp", new = MagicMock(return_value='/tmp/123'))
 @patch.object(contextlib,"closing", new = MagicMock())