Browse Source

AMBARI-10953. Hive Server 2 Process Check Should Always Use Beeline Instead of TCP Sockets (dlysnichenko)

Lisnichenko Dmitro 10 years ago
parent
commit
ccd0c44b64

+ 38 - 26
ambari-common/src/main/python/resource_management/libraries/functions/hive_check.py

@@ -22,35 +22,47 @@ from resource_management.core.resources import Execute
 from resource_management.libraries.functions import format
 import socket
 
-def check_thrift_port_sasl(address, port, hive_auth = "NOSASL", key = None, kinitcmd = None, smokeuser = 'ambari-qa',
-                           transport_mode = "binary"):
+
+def check_thrift_port_sasl(address, port, hive_auth="NOSASL", key=None, kinitcmd=None, smokeuser='ambari-qa',
+                           transport_mode="binary", http_endpoint="cliservice", ssl=False, ssl_keystore=None,
+                           ssl_password=None):
   """
   Hive thrift SASL port check
   """
-  BEELINE_CHECK_TIMEOUT = 30
 
+  # check params to be correctly passed, if not - try to cast them
+  if isinstance(port, str):
+    port = int(port)
+
+  if isinstance(ssl, str):
+    ssl = bool(ssl)
+
+  # to pass as beeline argument
+  ssl_str = str(ssl).lower()
+  beeline_check_timeout = 30
+  beeline_url = ['jdbc:hive2://{address}:{port}/', "transportMode={transport_mode}"]
+
+  # append url according to used transport
+  if transport_mode == "http":
+    beeline_url.append('httpPath={http_endpoint}')
+
+  # append url according to used auth
+  if hive_auth == "NOSASL":
+    beeline_url.append('auth=noSasl')
+
+  # append url according to ssl configuration
+  if ssl and ssl_keystore is not None and ssl_password is not None:
+    beeline_url.extend(['ssl={ssl_str}', 'sslTrustStore={ssl_keystore}', 'trustStorePassword={ssl_password!p}'])
+
+  # append url according to kerberos setting
   if kinitcmd:
-    url = format("jdbc:hive2://{address}:{port}/;principal={key}")
-    Execute(kinitcmd,
-            user=smokeuser
-    )
-  else:
-    url = format("jdbc:hive2://{address}:{port}")
-
-  if hive_auth != "NOSASL" and transport_mode != "http":
-    cmd = format("! beeline -u '{url}' -e '' ") + "2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'"
-    Execute(cmd,
-            user=smokeuser,
-            path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
-            timeout=BEELINE_CHECK_TIMEOUT
-    )
-  else:
-    s = socket.socket()
-    s.settimeout(1)
-    try:
-      s.connect((address, port))
-    except socket.error, e:
-      raise
-    finally:
-      s.close()
+    beeline_url.append('principal={key}')
+    Execute(kinitcmd, user=smokeuser)
 
+  cmd = "! beeline -u '%s' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'" % \
+        format(";".join(beeline_url))
+  Execute(cmd,
+          user=smokeuser,
+          path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
+          timeout=beeline_check_timeout
+  )

+ 24 - 9
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/alerts/alert_hive_thrift_port.py

@@ -36,6 +36,9 @@ HIVE_SERVER_PRINCIPAL_KEY = '{{hive-site/hive.server2.authentication.kerberos.pr
 SMOKEUSER_KEYTAB_KEY = '{{cluster-env/smokeuser_keytab}}'
 SMOKEUSER_PRINCIPAL_KEY = '{{cluster-env/smokeuser_principal_name}}'
 SMOKEUSER_KEY = '{{cluster-env/smokeuser}}'
+HIVE_SSL = '{{hive-site/hive.server2.use.SSL}}'
+HIVE_SSL_KEYSTORE_PATH = '{{hive-site/hive.server2.keystore.path}}'
+HIVE_SSL_KEYSTORE_PASSWORD = '{{hive-site/hive.server2.keystore.password}}'
 
 # The configured Kerberos executable search paths, if any
 KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY = '{{kerberos-env/executable_search_paths}}'
@@ -62,10 +65,11 @@ def get_tokens():
   Returns a tuple of tokens in the format {{site/property}} that will be used
   to build the dictionary passed into execute
   """
-  return (HIVE_SERVER_THRIFT_PORT_KEY,SECURITY_ENABLED_KEY, SMOKEUSER_KEY,
-    HIVE_SERVER2_AUTHENTICATION_KEY,HIVE_SERVER_PRINCIPAL_KEY,
-    SMOKEUSER_KEYTAB_KEY,SMOKEUSER_PRINCIPAL_KEY,HIVE_SERVER_THRIFT_HTTP_PORT_KEY,
-    HIVE_SERVER_TRANSPORT_MODE_KEY,KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY)
+  return (HIVE_SERVER_THRIFT_PORT_KEY, SECURITY_ENABLED_KEY, SMOKEUSER_KEY,
+          HIVE_SERVER2_AUTHENTICATION_KEY, HIVE_SERVER_PRINCIPAL_KEY,
+          SMOKEUSER_KEYTAB_KEY, SMOKEUSER_PRINCIPAL_KEY, HIVE_SERVER_THRIFT_HTTP_PORT_KEY,
+          HIVE_SERVER_TRANSPORT_MODE_KEY, KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY, HIVE_SSL,
+          HIVE_SSL_KEYSTORE_PATH, HIVE_SSL_KEYSTORE_PASSWORD)
 
 
 def execute(configurations={}, parameters={}, host_name=None):
@@ -88,7 +92,7 @@ def execute(configurations={}, parameters={}, host_name=None):
   port = THRIFT_PORT_DEFAULT
   if transport_mode.lower() == 'binary' and HIVE_SERVER_THRIFT_PORT_KEY in configurations:
     port = int(configurations[HIVE_SERVER_THRIFT_PORT_KEY])
-  elif  transport_mode.lower() == 'http' and HIVE_SERVER_THRIFT_HTTP_PORT_KEY in configurations:
+  elif transport_mode.lower() == 'http' and HIVE_SERVER_THRIFT_HTTP_PORT_KEY in configurations:
     port = int(configurations[HIVE_SERVER_THRIFT_HTTP_PORT_KEY])
 
   security_enabled = False
@@ -99,6 +103,18 @@ def execute(configurations={}, parameters={}, host_name=None):
   if HIVE_SERVER2_AUTHENTICATION_KEY in configurations:
     hive_server2_authentication = configurations[HIVE_SERVER2_AUTHENTICATION_KEY]
 
+  hive_ssl = False
+  if HIVE_SSL in configurations:
+    hive_ssl = configurations[HIVE_SSL]
+
+  hive_ssl_keystore_path = None
+  if HIVE_SSL_KEYSTORE_PATH in configurations:
+    hive_ssl_keystore_path = configurations[HIVE_SSL_KEYSTORE_PATH]
+
+  hive_ssl_keystore_password = None
+  if HIVE_SSL_KEYSTORE_PASSWORD in configurations:
+    hive_ssl_keystore_password = configurations[HIVE_SSL_KEYSTORE_PASSWORD]
+
   # defaults
   smokeuser_keytab = SMOKEUSER_KEYTAB_DEFAULT
   smokeuser_principal = SMOKEUSER_PRINCIPAL_DEFAULT
@@ -151,10 +167,9 @@ def execute(configurations={}, parameters={}, host_name=None):
     start_time = time.time()
 
     try:
-      hive_check.check_thrift_port_sasl(host_name, port,
-        hive_server2_authentication, hive_server_principal, kinitcmd, smokeuser,
-        transport_mode = transport_mode)
-
+      hive_check.check_thrift_port_sasl(host_name, port, hive_server2_authentication, hive_server_principal,
+                                        kinitcmd, smokeuser, transport_mode=transport_mode, ssl=hive_ssl,
+                                        ssl_keystore=hive_ssl_keystore_path, ssl_password=hive_ssl_keystore_password)
       result_code = 'OK'
       total_time = time.time() - start_time
       label = OK_MESSAGE.format(total_time, port)

+ 10 - 8
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py

@@ -89,9 +89,7 @@ def hive_service(name, action='start', rolling_restart=False):
     # AMBARI-5800 - wait for the server to come up instead of just the PID existance
     if name == 'hiveserver2':
       SOCKET_WAIT_SECONDS = 120
-      address=params.hostname
-      port=int(params.hive_server_port)
-      
+
       start_time = time.time()
       end_time = start_time + SOCKET_WAIT_SECONDS
 
@@ -103,9 +101,11 @@ def hive_service(name, action='start', rolling_restart=False):
         kinitcmd=None
       while time.time() < end_time:
         try:
-          check_thrift_port_sasl(address, port, params.hive_server2_authentication,
+          check_thrift_port_sasl(params.hostname, params.hive_server_port, params.hive_server2_authentication,
                                  params.hive_server_principal, kinitcmd, params.smokeuser,
-                                 transport_mode=params.hive_transport_mode)
+                                 transport_mode=params.hive_transport_mode, http_endpoint=params.hive_http_endpoint,
+                                 ssl=params.hive_ssl, ssl_keystore=params.hive_ssl_keystore_path,
+                                 ssl_password=params.hive_ssl_keystore_password)
           is_service_socket_valid = True
           break
         except Exception, e:
@@ -113,10 +113,12 @@ def hive_service(name, action='start', rolling_restart=False):
 
       elapsed_time = time.time() - start_time
       
-      if is_service_socket_valid == False: 
-        raise Fail("Connection to Hive server %s on port %s failed after %d seconds" % (address, port, elapsed_time))
+      if not is_service_socket_valid:
+        raise Fail("Connection to Hive server %s on port %s failed after %d seconds" %
+                   (params.hostname, params.hive_server_port, elapsed_time))
       
-      print "Successfully connected to Hive at %s on port %s after %d seconds" % (address, port, elapsed_time)    
+      print "Successfully connected to Hive at %s on port %s after %d seconds" %\
+            (params.hostname, params.hive_server_port, elapsed_time)
             
   elif action == 'stop':
 

+ 6 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py

@@ -147,9 +147,15 @@ else:
   hive_server_port = default('/configurations/hive-site/hive.server2.thrift.port',"10000")
 
 hive_url = format("jdbc:hive2://{hive_server_host}:{hive_server_port}")
+hive_http_endpoint = default('/confiurations/hive-site/hive.server2.thrift.http.path', "cliservice")
 hive_server_principal = config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal']
 hive_server2_authentication = config['configurations']['hive-site']['hive.server2.authentication']
 
+# ssl options
+hive_ssl = default('/confiurations/hive-site/hive.server2.use.SSL', False)
+hive_ssl_keystore_path = default('/confiurations/hive-site/hive.server2.keystore.path', None)
+hive_ssl_keystore_password = default('/confiurations/hive-site/hive.server2.keystore.password', None)
+
 smokeuser = config['configurations']['cluster-env']['smokeuser']
 smoke_test_sql = format("{tmp_dir}/hiveserver2.sql")
 smoke_test_path = format("{tmp_dir}/hiveserver2Smoke.sh")

+ 3 - 1
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/service_check.py

@@ -63,7 +63,9 @@ class HiveServiceCheckDefault(HiveServiceCheck):
       try:
         check_thrift_port_sasl(address, port, params.hive_server2_authentication,
                                params.hive_server_principal, kinitcmd, params.smokeuser,
-                               transport_mode=params.hive_transport_mode)
+                               transport_mode=params.hive_transport_mode, http_endpoint=params.hive_http_endpoint,
+                               ssl=params.hive_ssl, ssl_keystore=params.hive_ssl_keystore_path,
+                               ssl_password=params.hive_ssl_keystore_password)
         print "Successfully connected to %s on port %s" % (address, port)
         workable_server_available = True
       except:

+ 25 - 23
ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py

@@ -76,13 +76,16 @@ class TestHiveServer(RMFTestCase):
                               tries=5,
                               try_sleep=10
     )
+    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6401.ambari.apache.org:10000/;transportMode=binary;auth=noSasl' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+                              path = ['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
+                              user = 'ambari-qa',
+                              timeout = 30,
+                              )
     self.assertNoMoreResources()
 
 
   @patch.object(dynamic_variable_interpretation, "_get_tar_source_and_dest_folder")
-  @patch("socket.socket")
-  def test_start_default_no_copy(self, socket_mock, get_tar_mock):
-    s = socket_mock.return_value
+  def test_start_default_no_copy(self, get_tar_mock):
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
                        classname = "HiveServer",
@@ -109,17 +112,16 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', '/usr/jdk64/jdk1.7.0_45/bin/java -cp /usr/lib/ambari-agent/DBConnectionVerification.jar:/usr/lib/hive/lib//mysql-connector-java.jar org.apache.ambari.server.DBConnectionVerification \'jdbc:mysql://c6402.ambari.apache.org/hive?createDatabaseIfNotExist=true\' hive \'!`"\'"\'"\' 1\' com.mysql.jdbc.Driver',
                               path=['/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'], tries=5, try_sleep=10
     )
-
+    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6401.ambari.apache.org:10000/;transportMode=binary;auth=noSasl' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+                              path = ['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
+                              user = 'ambari-qa',
+                              timeout = 30,
+                              )
     self.assertNoMoreResources()
-    self.assertTrue(socket_mock.called)
-    self.assertTrue(s.close.called)
     self.assertFalse(get_tar_mock.called)
 
   @patch.object(dynamic_variable_interpretation, "_get_tar_source_and_dest_folder")
-  @patch("socket.socket")
-  def test_start_default_alt_tmp(self, socket_mock, get_tar_mock):
-    s = socket_mock.return_value
-
+  def test_start_default_alt_tmp(self, get_tar_mock):
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "start",
@@ -145,18 +147,17 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', '/usr/jdk64/jdk1.7.0_45/bin/java -cp /usr/lib/ambari-agent/DBConnectionVerification.jar:/usr/lib/hive/lib//mysql-connector-java.jar org.apache.ambari.server.DBConnectionVerification \'jdbc:mysql://c6402.ambari.apache.org/hive?createDatabaseIfNotExist=true\' hive \'!`"\'"\'"\' 1\' com.mysql.jdbc.Driver',
                               path=['/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'], tries=5, try_sleep=10
     )
-
+    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6401.ambari.apache.org:10000/;transportMode=binary;auth=noSasl' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+                              path = ['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
+                              user = 'ambari-qa',
+                              timeout = 30,
+                              )
     self.assertNoMoreResources()
-    self.assertTrue(socket_mock.called)
-    self.assertTrue(s.close.called)
     self.assertFalse(get_tar_mock.called)
 
 
   @patch.object(dynamic_variable_interpretation, "_get_tar_source_and_dest_folder")
-  @patch("socket.socket")
-  def test_start_default_alt_nn_ha_tmp(self, socket_mock, get_tar_mock):
-    s = socket_mock.return_value
-
+  def test_start_default_alt_nn_ha_tmp(self, get_tar_mock):
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "start",
@@ -182,15 +183,16 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', '/usr/jdk64/jdk1.7.0_45/bin/java -cp /usr/lib/ambari-agent/DBConnectionVerification.jar:/usr/lib/hive/lib//mysql-connector-java.jar org.apache.ambari.server.DBConnectionVerification \'jdbc:mysql://c6402.ambari.apache.org/hive?createDatabaseIfNotExist=true\' hive \'!`"\'"\'"\' 1\' com.mysql.jdbc.Driver',
                               path=['/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'], tries=5, try_sleep=10
     )
-
+    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6401.ambari.apache.org:10000/;transportMode=binary;auth=noSasl' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+                              path = ['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
+                              user = 'ambari-qa',
+                              timeout = 30,
+                              )
     self.assertNoMoreResources()
-    self.assertTrue(socket_mock.called)
-    self.assertTrue(s.close.called)
     self.assertFalse(get_tar_mock.called)
 
-  @patch("socket.socket")
   @patch.object(dynamic_variable_interpretation, "copy_tarballs_to_hdfs", new=MagicMock())
-  def test_stop_default(self, socket_mock):
+  def test_stop_default(self):
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "stop",
@@ -263,7 +265,7 @@ class TestHiveServer(RMFTestCase):
                               user='ambari-qa',
     )
     self.assertResourceCalled('Execute',
-                              "! beeline -u 'jdbc:hive2://c6401.ambari.apache.org:10000/;principal=hive/_HOST@EXAMPLE.COM' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+                              "! beeline -u 'jdbc:hive2://c6401.ambari.apache.org:10000/;transportMode=binary;principal=hive/_HOST@EXAMPLE.COM' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
                               path=['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
                               user='ambari-qa',
                               timeout=30,

+ 6 - 2
ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_service_check.py

@@ -38,6 +38,11 @@ class TestServiceCheck(RMFTestCase):
                         hdp_stack_version = self.STACK_VERSION,
                         target = RMFTestCase.TARGET_COMMON_SERVICES
     )
+    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6402.ambari.apache.org:10000/;transportMode=binary;auth=noSasl' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+                              path = ['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
+                              user = 'ambari-qa',
+                              timeout = 30,
+                              )
     self.assertResourceCalled('File', '/tmp/hcatSmoke.sh',
                         content = StaticFile('hcatSmoke.sh'),
                         mode = 0755,
@@ -84,7 +89,6 @@ class TestServiceCheck(RMFTestCase):
                               try_sleep = 5,
                               )
     self.assertNoMoreResources()
-    self.assertTrue(socket_mock.called)
 
   @patch("sys.exit")
   def test_service_check_secured(self, sys_exit_mock, socket_mock):
@@ -99,7 +103,7 @@ class TestServiceCheck(RMFTestCase):
     self.assertResourceCalled('Execute', '/usr/bin/kinit -kt /etc/security/keytabs/smokeuser.headless.keytab ambari-qa@EXAMPLE.COM; ',
                               user = 'ambari-qa',
                               )
-    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6402.ambari.apache.org:10000/;principal=hive/_HOST@EXAMPLE.COM' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
+    self.assertResourceCalled('Execute', "! beeline -u 'jdbc:hive2://c6402.ambari.apache.org:10000/;transportMode=binary;principal=hive/_HOST@EXAMPLE.COM' -e '' 2>&1| awk '{print}'|grep -i -e 'Connection refused' -e 'Invalid URL'",
                               path = ['/bin/', '/usr/bin/', '/usr/lib/hive/bin/', '/usr/sbin/'],
                               user = 'ambari-qa',
                               timeout = 30,