Browse Source

AMBARI-14732. Extend HAWQ service check to include querying external table through PXF(mithmatt via odiachenko).

Oleksandr Diachenko 9 years ago
parent
commit
3765a39055

+ 3 - 2
ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/hawq_constants.py

@@ -58,8 +58,9 @@ postmaster_opts_filename = "postmaster.opts"
 postmaster_pid_filename = "postmaster.pid"
 postmaster_pid_filename = "postmaster.pid"
 hawq_keytab_file = "/etc/security/keytabs/hawq.service.keytab"
 hawq_keytab_file = "/etc/security/keytabs/hawq.service.keytab"
 
 
-# Smoke check table
-smoke_check_table_name = "ambari_hawq_smoke_test"
+# HAWQ-PXF check params
+PXF_PORT = "51200"
+pxf_hdfs_test_dir = "/user/{0}/hawq_pxf_hdfs_service_check".format(hawq_user)
 
 
 # Timeouts
 # Timeouts
 default_exec_timeout = 600
 default_exec_timeout = 600

+ 28 - 0
ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/params.py

@@ -18,6 +18,7 @@ limitations under the License.
 
 
 import os
 import os
 import functools
 import functools
+from hawq_constants import PXF_PORT, pxf_hdfs_test_dir
 from resource_management import Script
 from resource_management import Script
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.functions.default import default
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.resources.hdfs_resource import HdfsResource
@@ -72,6 +73,7 @@ hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_nam
 hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
 hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
 hadoop_bin_dir = hadoop_select.get_hadoop_dir("bin")
 hadoop_bin_dir = hadoop_select.get_hadoop_dir("bin")
 execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir
 execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir
+dfs_nameservice = default('/configurations/hdfs-site/dfs.nameservices', None)
 
 
 # HDFSResource partial function
 # HDFSResource partial function
 HdfsResource = functools.partial(HdfsResource,
 HdfsResource = functools.partial(HdfsResource,
@@ -94,6 +96,32 @@ ExecuteHadoop = functools.partial(ExecuteHadoop,
                                   principal=hdfs_principal_name,
                                   principal=hdfs_principal_name,
                                   bin_dir=execute_path)
                                   bin_dir=execute_path)
 
 
+
+# For service Check
+is_pxf_installed = __get_component_host("pxf_hosts") is not None
+namenode_path =  "{0}:{1}".format(__get_component_host("namenode_host"), PXF_PORT) if dfs_nameservice is None else dfs_nameservice
+table_definition = {
+  "HAWQ": {
+    "name": "ambari_hawq_test",
+    "create_type": "",
+    "drop_type": "",
+    "description": "(col1 int) DISTRIBUTED RANDOMLY"
+  },
+  "EXTERNAL_HDFS_READABLE": {
+    "name": "ambari_hawq_pxf_hdfs_readable_test",
+    "create_type": "READABLE EXTERNAL",
+    "drop_type": "EXTERNAL",
+    "description": "(col1 int) LOCATION ('pxf://{0}{1}?PROFILE=HdfsTextSimple') FORMAT 'TEXT'".format(namenode_path, pxf_hdfs_test_dir)
+  },
+  "EXTERNAL_HDFS_WRITABLE": {
+    "name": "ambari_hawq_pxf_hdfs_writable_test",
+    "create_type": "WRITABLE EXTERNAL",
+    "drop_type": "EXTERNAL",
+    "description": "(col1 int) LOCATION ('pxf://{0}{1}?PROFILE=HdfsTextSimple') FORMAT 'TEXT'".format(namenode_path, pxf_hdfs_test_dir)
+  }
+}
+
+
 # YARN
 # YARN
 # Note: YARN is not mandatory for HAWQ. It is required only when the users set HAWQ to use YARN as resource manager
 # Note: YARN is not mandatory for HAWQ. It is required only when the users set HAWQ to use YARN as resource manager
 rm_host = __get_component_host('rm_host')
 rm_host = __get_component_host('rm_host')

+ 106 - 38
ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/service_check.py

@@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 """
 """
+
+import sys
 import common
 import common
 import hawq_constants
 import hawq_constants
 from utils import exec_psql_cmd, exec_ssh_cmd
 from utils import exec_psql_cmd, exec_ssh_cmd
@@ -22,81 +24,147 @@ from resource_management.libraries.script import Script
 from resource_management.core.exceptions import Fail
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 from resource_management.core.logger import Logger
 
 
-import sys
 
 
 class HAWQServiceCheck(Script):
 class HAWQServiceCheck(Script):
   """
   """
-  Runs a set of simple HAWQ tests to verify if the service has been setup correctly
+  Runs a set of HAWQ tests to verify if the service has been setup correctly
   """
   """
 
 
   def __init__(self):
   def __init__(self):
     self.active_master_host = common.get_local_hawq_site_property("hawq_master_address_host")
     self.active_master_host = common.get_local_hawq_site_property("hawq_master_address_host")
+    self.checks_failed = 0
+    self.total_checks = 3
 
 
 
 
   def service_check(self, env):
   def service_check(self, env):
-    Logger.info("Starting HAWQ service checks..")
-    # All the tests are run on the active_master_host using ssh irrespective of the node on which service check
-    # is executed by Ambari
+    """
+    Runs service check for HAWQ.
+    """
+    import params
+
+    # Checks HAWQ cluster state
+    self.check_state()
+
+    # Runs check for writing and reading tables on HAWQ
+    self.check_hawq()
+
+    # Runs check for writing and reading external tables on HDFS using PXF, if PXF is installed
+    if params.is_pxf_installed:
+      self.check_hawq_pxf_hdfs()
+    else:
+      Logger.info("PXF not installed. Skipping HAWQ-PXF checks...")
+
+    if self.checks_failed != 0:
+      Logger.error("** FAILURE **: Service check failed {0} of {1} checks".format(self.checks_failed, self.total_checks))
+      sys.exit(1)
+
+    Logger.info("Service check completed successfully")
+
+
+  def check_state(self):
+    """
+    Checks state of HAWQ cluster
+    """
+    import params
+    Logger.info("--- Check state of HAWQ cluster ---")
     try:
     try:
-      self.check_state()
-      self.drop_table()
-      self.create_table()
-      self.insert_data()
-      self.query_data()
-      self.check_data_correctness()
+      command = "source {0} && hawq state -d {1}".format(hawq_constants.hawq_greenplum_path_file, params.hawq_master_dir)
+      Logger.info("Executing hawq status check...")
+      (retcode, out, err) = exec_ssh_cmd(self.active_master_host, command)
+      if retcode:
+        Logger.error("SERVICE CHECK FAILED: hawq state command returned non-zero result: {0}. Out: {1} Error: {2}".format(retcode, out, err))
+        raise Fail("Unexpected result of hawq state command.")
+      Logger.info("Output of command:\n{0}".format(str(out) + "\n"))
     except:
     except:
-      Logger.error("Service check failed")
-      sys.exit(1)
+      self.checks_failed += 1
+
+
+  def check_hawq(self):
+    """
+    Tests to check HAWQ
+    """
+    import params
+    Logger.info("--- Check if HAWQ can write and query from a table ---")
+    table = params.table_definition['HAWQ']
+    try:
+      self.drop_table(table)
+      self.create_table(table)
+      self.insert_data(table)
+      self.query_data(table)
+      self.validate_data(table)
+    except:
+      Logger.error("SERVICE CHECK FAILED: HAWQ was not able to write and query from a table")
+      self.checks_failed += 1
     finally:
     finally:
-      self.drop_table()
+      self.drop_table(table)
 
 
-    Logger.info("Service check completed successfully")
+
+  def check_hawq_pxf_hdfs(self):
+    """
+    Tests to check if HAWQ can write and read external tables on HDFS using PXF
+    """
+    import params
+    Logger.info("--- Check if HAWQ can write and query from HDFS using PXF External Tables ---")
+    table_writable = params.table_definition['EXTERNAL_HDFS_WRITABLE']
+    table_readable = params.table_definition['EXTERNAL_HDFS_READABLE']
+    try:
+      self.delete_pxf_hdfs_test_dir()
+      self.drop_table(table_writable)
+      self.create_table(table_writable)
+      self.insert_data(table_writable)
+      self.drop_table(table_readable)
+      self.create_table(table_readable)
+      self.query_data(table_readable)
+      self.validate_data(table_readable)
+    except:
+      Logger.error("SERVICE CHECK FAILED: HAWQ was not able to write and query from HDFS using PXF External Tables")
+      self.checks_failed += 1
+    finally:
+      self.drop_table(table_readable)
+      self.drop_table(table_writable)
+      self.delete_pxf_hdfs_test_dir()
 
 
 
 
-  def drop_table(self):
-    Logger.info("Dropping {0} table if exists".format(hawq_constants.smoke_check_table_name))
-    sql_cmd = "drop table if exists {0}".format(hawq_constants.smoke_check_table_name)
+  def drop_table(self, table):
+    Logger.info("Dropping {0} table if exists".format(table['name']))
+    sql_cmd = "DROP {0} TABLE IF EXISTS {1}".format(table['drop_type'], table['name'])
     exec_psql_cmd(sql_cmd, self.active_master_host)
     exec_psql_cmd(sql_cmd, self.active_master_host)
 
 
 
 
-  def create_table(self):
-    Logger.info("Creating table {0}".format(hawq_constants.smoke_check_table_name))
-    sql_cmd = "create table {0} (col1 int) distributed randomly".format(hawq_constants.smoke_check_table_name)
+  def create_table(self, table):
+    Logger.info("Creating table {0}".format(table['name']))
+    sql_cmd = "CREATE {0} TABLE {1} {2}".format(table['create_type'], table['name'], table['description'])
     exec_psql_cmd(sql_cmd, self.active_master_host)
     exec_psql_cmd(sql_cmd, self.active_master_host)
 
 
 
 
-  def insert_data(self):
-    Logger.info("Inserting data to table {0}".format(hawq_constants.smoke_check_table_name))
-    sql_cmd = "insert into {0} select * from generate_series(1,10)".format(hawq_constants.smoke_check_table_name)
+  def insert_data(self, table):
+    Logger.info("Inserting data to table {0}".format(table['name']))
+    sql_cmd = "INSERT INTO  {0} SELECT * FROM generate_series(1,10)".format(table['name'])
     exec_psql_cmd(sql_cmd, self.active_master_host)
     exec_psql_cmd(sql_cmd, self.active_master_host)
 
 
 
 
-  def query_data(self):
-    Logger.info("Querying data from table {0}".format(hawq_constants.smoke_check_table_name))
-    sql_cmd = "select * from {0}".format(hawq_constants.smoke_check_table_name)
+  def query_data(self, table):
+    Logger.info("Querying data from table {0}".format(table['name']))
+    sql_cmd = "SELECT * FROM {0}".format(table['name'])
     exec_psql_cmd(sql_cmd, self.active_master_host)
     exec_psql_cmd(sql_cmd, self.active_master_host)
 
 
 
 
-  def check_data_correctness(self):
+  def validate_data(self, table):
     expected_data = "55"
     expected_data = "55"
     Logger.info("Validating data inserted, finding sum of all the inserted entries. Expected output: {0}".format(expected_data))
     Logger.info("Validating data inserted, finding sum of all the inserted entries. Expected output: {0}".format(expected_data))
-    sql_cmd = "select sum(col1) from {0}".format(hawq_constants.smoke_check_table_name)
+    sql_cmd = "SELECT sum(col1) FROM {0}".format(table['name'])
     _, stdout, _ = exec_psql_cmd(sql_cmd, self.active_master_host, tuples_only=False)
     _, stdout, _ = exec_psql_cmd(sql_cmd, self.active_master_host, tuples_only=False)
     if expected_data != stdout.strip():
     if expected_data != stdout.strip():
       Logger.error("Incorrect data returned. Expected Data: {0} Actual Data: {1}".format(expected_data, stdout))
       Logger.error("Incorrect data returned. Expected Data: {0} Actual Data: {1}".format(expected_data, stdout))
       raise Fail("Incorrect data returned.")
       raise Fail("Incorrect data returned.")
 
 
 
 
-  def check_state(self):
+  def delete_pxf_hdfs_test_dir(self):
     import params
     import params
-    command = "source {0} && hawq state -d {1}".format(hawq_constants.hawq_greenplum_path_file, params.hawq_master_dir)
-    Logger.info("Executing hawq status check..")
-    (retcode, out, err) = exec_ssh_cmd(self.active_master_host, command)
-    if retcode:
-      Logger.error("hawq state command returned non-zero result: {0}. Out: {1} Error: {2}".format(retcode, out, err))
-      raise Fail("Unexpected result of hawq state command.")
-    Logger.info("Output of command:\n{0}".format(str(out) + "\n"))
+    params.HdfsResource(hawq_constants.pxf_hdfs_test_dir,
+                        type="directory",
+                        action="delete_on_execute")
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-  HAWQServiceCheck().execute()
+  HAWQServiceCheck().execute()

+ 3 - 3
ambari-server/src/main/resources/common-services/HAWQ/2.0.0/package/scripts/utils.py

@@ -82,7 +82,7 @@ def exec_ssh_cmd(hostname, cmd):
   import params
   import params
   # Only gpadmin should be allowed to run command via ssh, thus not exposing user as a parameter
   # Only gpadmin should be allowed to run command via ssh, thus not exposing user as a parameter
   if params.hostname != hostname:
   if params.hostname != hostname:
-    cmd = "su - {0} -c 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {1} \"{2} \" '".format(hawq_constants.hawq_user, hostname, cmd)
+    cmd = "su - {0} -c \"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {1} \\\"{2} \\\" \"".format(hawq_constants.hawq_user, hostname, cmd)
   else:
   else:
     cmd = "su - {0} -c \"{1}\"".format(hawq_constants.hawq_user, cmd)
     cmd = "su - {0} -c \"{1}\"".format(hawq_constants.hawq_user, cmd)
   Logger.info("Command executed: {0}".format(cmd))
   Logger.info("Command executed: {0}".format(cmd))
@@ -97,9 +97,9 @@ def exec_psql_cmd(command, host, db="template1", tuples_only=True):
   """
   """
   src_cmd = "source {0}".format(hawq_constants.hawq_greenplum_path_file)
   src_cmd = "source {0}".format(hawq_constants.hawq_greenplum_path_file)
   if tuples_only:
   if tuples_only:
-    cmd = src_cmd + " && psql -d {0} -c \\\"{1};\\\"".format(db, command)
+    cmd = src_cmd + " && psql -d {0} -c \\\\\\\"{1};\\\\\\\"".format(db, command)
   else:
   else:
-    cmd = src_cmd + " && psql -t -d {0} -c \\\"{1};\\\"".format(db, command)
+    cmd = src_cmd + " && psql -t -d {0} -c \\\\\\\"{1};\\\\\\\"".format(db, command)
   retcode, out, err = exec_ssh_cmd(host, cmd)
   retcode, out, err = exec_ssh_cmd(host, cmd)
   if retcode:
   if retcode:
     Logger.error("SQL command executed failed: {0}\nReturncode: {1}\nStdout: {2}\nStderr: {3}".format(cmd, retcode, out, err))
     Logger.error("SQL command executed failed: {0}\nReturncode: {1}\nStdout: {2}\nStderr: {3}".format(cmd, retcode, out, err))