Explorar el Código

AMBARI-14959: Implement service check for secured PXF service (lavjain via jaoki)

Jun Aoki hace 9 años
padre
commit
feb50e3a3f

+ 13 - 4
ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/params.py

@@ -22,6 +22,7 @@ from resource_management import Script
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.functions import get_kinit_path
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
+from resource_management.libraries.functions.namenode_ha_utils import get_active_namenode
 
 
 config = Script.get_config()
 config = Script.get_config()
 
 
@@ -31,9 +32,10 @@ stack_name = str(config["hostLevelParams"]["stack_name"])
 # Users and Groups
 # Users and Groups
 pxf_user = "pxf"
 pxf_user = "pxf"
 pxf_group = pxf_user
 pxf_group = pxf_user
-hdfs_superuser = config['configurations']['hadoop-env']['hdfs_user']
+hdfs_user = config['configurations']['hadoop-env']['hdfs_user']
 hdfs_superuser_group = config["configurations"]["hdfs-site"]["dfs.permissions.superusergroup"]
 hdfs_superuser_group = config["configurations"]["hdfs-site"]["dfs.permissions.superusergroup"]
 user_group = config["configurations"]["cluster-env"]["user_group"]
 user_group = config["configurations"]["cluster-env"]["user_group"]
+hbase_user = default('configurations/hbase-env/hbase_user', None)
 hive_user = default('configurations/hive-env/hive_user', None)
 hive_user = default('configurations/hive-env/hive_user', None)
 tomcat_group = "tomcat"
 tomcat_group = "tomcat"
 
 
@@ -60,14 +62,21 @@ is_hive_installed = default("/clusterHostInfo/hive_server_host", None) is not No
 # HDFS
 # HDFS
 hdfs_site = config['configurations']['hdfs-site']
 hdfs_site = config['configurations']['hdfs-site']
 default_fs = config['configurations']['core-site']['fs.defaultFS']
 default_fs = config['configurations']['core-site']['fs.defaultFS']
+namenode_path =  default('/configurations/hdfs-site/dfs.namenode.http-address', None)
+dfs_nameservice = default('/configurations/hdfs-site/dfs.nameservices', None)
+if dfs_nameservice:
+  namenode_path =  get_active_namenode(hdfs_site, security_enabled, hdfs_user)[1]
 
 
-hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab']
+# keytabs and principals
 kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
 kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
-hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name']
+hdfs_user_keytab = default('configurations/hadoop-env/hdfs_user_keytab', None)
+hdfs_principal_name = default('configurations/hadoop-env/hdfs_principal_name', None)
+hbase_user_keytab = default('configurations/hbase-env/hbase_user_keytab', None)
+hbase_principal_name = default('configurations/hbase-env/hbase_principal_name', None)
 
 
 # HDFSResource partial function
 # HDFSResource partial function
 HdfsResource = functools.partial(HdfsResource,
 HdfsResource = functools.partial(HdfsResource,
-    user=hdfs_superuser,
+    user=hdfs_user,
     security_enabled=security_enabled,
     security_enabled=security_enabled,
     keytab=hdfs_user_keytab,
     keytab=hdfs_user_keytab,
     kinit_path_local=kinit_path_local,
     kinit_path_local=kinit_path_local,

+ 63 - 18
ambari-server/src/main/resources/common-services/PXF/3.0.0/package/scripts/service_check.py

@@ -15,15 +15,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 """
 """
+import json
+
 from resource_management.libraries.script import Script
 from resource_management.libraries.script import Script
 from resource_management.core.exceptions import Fail
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.logger import Logger
 from resource_management.core.system import System
 from resource_management.core.system import System
 from resource_management.core.resources.system import Execute
 from resource_management.core.resources.system import Execute
-
+from resource_management.core.environment import Environment
+from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 from pxf_utils import makeHTTPCall, runLocalCmd
 from pxf_utils import makeHTTPCall, runLocalCmd
 import pxf_constants
 import pxf_constants
 
 
+
 class PXFServiceCheck(Script):
 class PXFServiceCheck(Script):
   """
   """
   Runs a set of simple PXF tests to verify if the service has been setup correctly
   Runs a set of simple PXF tests to verify if the service has been setup correctly
@@ -44,11 +48,19 @@ class PXFServiceCheck(Script):
 
 
 
 
   def service_check(self, env):
   def service_check(self, env):
-    Logger.info("Starting PXF service checks..")
-
+    """
+    Runs the service check for PXF
+    """
     import params
     import params
-    self.pxf_version = self.__get_pxf_protocol_version()
+    Logger.info("Starting PXF service checks..")
     try:
     try:
+      # Get delegation token if security is enabled
+      if params.security_enabled:
+        token = self.__get_delegation_token(params.hdfs_user, params.hdfs_user_keytab,
+                                            params.hdfs_principal_name, params.kinit_path_local)
+        self.commonPXFHeaders.update({"X-GP-TOKEN": token})
+
+      self.pxf_version = self.__get_pxf_protocol_version()
       self.run_hdfs_tests()
       self.run_hdfs_tests()
       if params.is_hbase_installed:
       if params.is_hbase_installed:
         self.run_hbase_tests()
         self.run_hbase_tests()
@@ -59,7 +71,10 @@ class PXFServiceCheck(Script):
       Logger.error(msg)
       Logger.error(msg)
       raise Fail(msg)
       raise Fail(msg)
     finally:
     finally:
-      self.cleanup_test_data()
+      try:
+        self.cleanup_test_data()
+      except Exception as e:
+        Logger.error(e)
 
 
     Logger.info("Service check completed successfully")
     Logger.info("Service check completed successfully")
 
 
@@ -111,9 +126,28 @@ class PXFServiceCheck(Script):
         raise
         raise
     except:
     except:
       msg = "PXF data read failed"
       msg = "PXF data read failed"
+      Logger.error(msg)
       raise Fail(msg)
       raise Fail(msg)
 
 
 
 
+  def __get_delegation_token(self, user, keytab, principal, kinit_path):
+    """
+    Gets the kerberos delegation token from name node
+    """
+    import params
+    url = params.namenode_path + "/webhdfs/v1/?op=GETDELEGATIONTOKEN"
+    Logger.info("Getting delegation token from {0}".format(url))
+    response, _, _  = curl_krb_request(Environment.get_instance().tmp_dir, keytab, principal,
+        url, "get_delegation_token", kinit_path, False, "Delegation Token", user)
+    json_response = json.loads(response)
+    if json_response['Token'] and json_response['Token']['urlString']:
+      return json_response['Token']['urlString']
+
+    msg = "Unable to get delegation token"
+    Logger.error(msg)
+    raise Fail(msg)
+
+
   # HDFS Routines
   # HDFS Routines
   def run_hdfs_tests(self):
   def run_hdfs_tests(self):
     """
     """
@@ -136,20 +170,20 @@ class PXFServiceCheck(Script):
         type="directory",
         type="directory",
         action="create_on_execute",
         action="create_on_execute",
         mode=0777
         mode=0777
-        )
-
+    )
     params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file,
     params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file,
         type="file",
         type="file",
         source="/etc/passwd",
         source="/etc/passwd",
         action="create_on_execute"
         action="create_on_execute"
-        )
+    )
+    params.HdfsResource(None, action="execute")
 
 
   def __check_pxf_hdfs_read(self):
   def __check_pxf_hdfs_read(self):
     """
     """
     Reads the test HDFS data through PXF
     Reads the test HDFS data through PXF
     """
     """
     Logger.info("Testing PXF HDFS read")
     Logger.info("Testing PXF HDFS read")
-    headers = { 
+    headers = {
         "X-GP-DATA-DIR": pxf_constants.pxf_hdfs_test_dir,
         "X-GP-DATA-DIR": pxf_constants.pxf_hdfs_test_dir,
         "X-GP-profile": "HdfsTextSimple",
         "X-GP-profile": "HdfsTextSimple",
         }
         }
@@ -182,6 +216,7 @@ class PXFServiceCheck(Script):
         raise 
         raise 
     except:
     except:
       msg = "PXF HDFS data write test failed"
       msg = "PXF HDFS data write test failed"
+      Logger.error(msg)
       raise Fail(msg)
       raise Fail(msg)
 
 
   def __cleanup_hdfs_data(self):
   def __cleanup_hdfs_data(self):
@@ -193,11 +228,12 @@ class PXFServiceCheck(Script):
     params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file,
     params.HdfsResource(pxf_constants.pxf_hdfs_read_test_file,
         type="file",
         type="file",
         action="delete_on_execute"
         action="delete_on_execute"
-        )
+    )
     params.HdfsResource(pxf_constants.pxf_hdfs_test_dir,
     params.HdfsResource(pxf_constants.pxf_hdfs_test_dir,
         type="directory",
         type="directory",
         action="delete_on_execute"
         action="delete_on_execute"
-        )
+    )
+    params.HdfsResource(None, action="execute")
 
 
 
 
   # HBase Routines
   # HBase Routines
@@ -205,7 +241,11 @@ class PXFServiceCheck(Script):
     """
     """
     Runs a set of PXF HBase checks
     Runs a set of PXF HBase checks
     """
     """
+    import params
     Logger.info("Running PXF HBase checks")
     Logger.info("Running PXF HBase checks")
+    if params.security_enabled:
+      Execute("{0} -kt {1} {2}".format(params.kinit_path_local, params.hbase_user_keytab, params.hbase_principal_name),
+              user = params.hbase_user)
     self.__cleanup_hbase_data()
     self.__cleanup_hbase_data()
     self.__check_if_client_exists("HBase")
     self.__check_if_client_exists("HBase")
     self.__write_hbase_data()
     self.__write_hbase_data()
@@ -215,9 +255,12 @@ class PXFServiceCheck(Script):
     """
     """
     Creates a temporary HBase table for the service checks
     Creates a temporary HBase table for the service checks
     """
     """
+    import params
     Logger.info("Creating temporary HBase test data")
     Logger.info("Creating temporary HBase test data")
-    Execute("echo \"create '" + pxf_constants.pxf_hbase_test_table + "', 'cf'\"|hbase shell", logoutput = True)
-    Execute("echo \"put '" + pxf_constants.pxf_hbase_test_table + "', 'row1', 'cf:a', 'value1'; put '" + pxf_constants.pxf_hbase_test_table + "', 'row1', 'cf:b', 'value2'\" | hbase shell", logoutput = True)
+    cmd = "echo \"create '{0}', 'cf'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
+    cmd = "echo \"put '{0}', 'row1', 'cf:a', 'value1'; put '{0}', 'row1', 'cf:b', 'value2'\" | hbase shell".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
 
 
   def __check_pxf_hbase_read(self):
   def __check_pxf_hbase_read(self):
     """
     """
@@ -229,16 +272,18 @@ class PXFServiceCheck(Script):
         "X-GP-profile": "HBase",
         "X-GP-profile": "HBase",
         }
         }
     headers.update(self.commonPXFHeaders)
     headers.update(self.commonPXFHeaders)
-
     self.__check_pxf_read(headers)
     self.__check_pxf_read(headers)
 
 
   def __cleanup_hbase_data(self):
   def __cleanup_hbase_data(self):
     """
     """
     Cleans up the test HBase data
     Cleans up the test HBase data
     """
     """
+    import params
     Logger.info("Cleaning up HBase test data")
     Logger.info("Cleaning up HBase test data")
-    Execute("echo \"disable '" + pxf_constants.pxf_hbase_test_table + "'\"|hbase shell > /dev/null 2>&1", logoutput = True)
-    Execute("echo \"drop '" + pxf_constants.pxf_hbase_test_table + "'\"|hbase shell > /dev/null 2>&1", logoutput = True)
+    cmd = "echo \"disable '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
+    cmd = "echo \"drop '{0}'\" | hbase shell > /dev/null 2>&1".format(pxf_constants.pxf_hbase_test_table)
+    Execute(cmd, logoutput = True, user = params.hbase_user)
 
 
 
 
   # Hive Routines
   # Hive Routines
@@ -259,7 +304,7 @@ class PXFServiceCheck(Script):
     import params
     import params
     Logger.info("Creating temporary Hive test data")
     Logger.info("Creating temporary Hive test data")
     cmd = "hive -e 'CREATE TABLE IF NOT EXISTS {0} (id INT); INSERT INTO {0} VALUES (1);'".format(pxf_constants.pxf_hive_test_table)
     cmd = "hive -e 'CREATE TABLE IF NOT EXISTS {0} (id INT); INSERT INTO {0} VALUES (1);'".format(pxf_constants.pxf_hive_test_table)
-    Execute(cmd, logoutput = True, user = params.hive_user)
+    Execute(cmd, logoutput = True, user = params.hdfs_user)
 
 
   def __check_pxf_hive_read(self):
   def __check_pxf_hive_read(self):
     """
     """
@@ -280,7 +325,7 @@ class PXFServiceCheck(Script):
     import params
     import params
     Logger.info("Cleaning up Hive test data")
     Logger.info("Cleaning up Hive test data")
     cmd = "hive -e 'DROP TABLE IF EXISTS {0};'".format(pxf_constants.pxf_hive_test_table)
     cmd = "hive -e 'DROP TABLE IF EXISTS {0};'".format(pxf_constants.pxf_hive_test_table)
-    Execute(cmd, logoutput = True, user = params.hive_user)
+    Execute(cmd, logoutput = True, user = params.hdfs_user)
 
 
 
 
   # Package Routines
   # Package Routines