ソースを参照

AMBARI-5800. Race condition when starting all services causing Hive service check to fail (Jonathan Hurley via ncole)

Nate Cole 11 年 前
コミット
a3665f2057

+ 42 - 9
ambari-server/src/main/resources/stacks/HDP/1.3.2/services/HIVE/package/scripts/hive_service.py

@@ -19,7 +19,9 @@ limitations under the License.
 """
 
 from resource_management import *
-
+import socket
+import sys
+import time
 
 def hive_service(
     name,
@@ -36,29 +38,60 @@ def hive_service(
     cmd = format(
       "env JAVA_HOME={java64_home} {start_hiveserver2_path} {hive_log_dir}/hive-server2.out {hive_log_dir}/hive-server2.log {pid_file} {hive_server_conf_dir} {hive_log_dir}")
 
-  is_started = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1")
+  process_id_exists = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1")
   
   if action == 'start':
     demon_cmd = format("{cmd}")
     
     Execute(demon_cmd,
             user=params.hive_user,
-            not_if=is_started
+            not_if=process_id_exists
     )
 
     if params.hive_jdbc_driver == "com.mysql.jdbc.Driver" or \
        params.hive_jdbc_driver == "org.postgresql.Driver" or \
        params.hive_jdbc_driver == "oracle.jdbc.driver.OracleDriver":
+      
       db_connection_check_command = format(
         "{java64_home}/bin/java -cp {check_db_connection_jar}:/usr/share/java/{jdbc_jar_name} org.apache.ambari.server.DBConnectionVerification {hive_jdbc_connection_url} {hive_metastore_user_name} {hive_metastore_user_passwd!p} {hive_jdbc_driver}")
+      
       Execute(db_connection_check_command,
-              path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
-      )
-
+              path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin')
+      
+    # AMBARI-5800 - wait for the server to come up instead of just the PID existance
+    if name == 'hiveserver2':
+      SOCKET_WAIT_SECONDS = 120
+      address=params.hive_server_host
+      port=int(params.hive_server_port)
+      
+      start_time = time.time()
+      end_time = start_time + SOCKET_WAIT_SECONDS
+      
+      s = socket.socket()
+      s.settimeout(5)
+            
+      is_service_socket_valid = False
+      print "Waiting for the Hive server to start..."
+      try:
+        while time.time() < end_time:
+          try:
+            s.connect((address, port))
+            is_service_socket_valid = True
+            break
+          except socket.error, e:          
+            time.sleep(5)
+      finally:
+        s.close()
+      
+      elapsed_time = time.time() - start_time    
+      
+      if is_service_socket_valid == False: 
+        raise Fail("Connection to Hive server %s on port %s failed after %d seconds" % (address, port, elapsed_time))
+      
+      print "Successfully connected to Hive at %s on port %s after %d seconds" % (address, port, elapsed_time)    
+            
   elif action == 'stop':
     demon_cmd = format("kill `cat {pid_file}` >/dev/null 2>&1 && rm -f {pid_file}")
     Execute(demon_cmd,
-            not_if = format("! ({is_started})")
+            not_if = format("! ({process_id_exists})")
     )
-
-

+ 41 - 6
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HIVE/package/scripts/hive_service.py

@@ -19,7 +19,9 @@ limitations under the License.
 """
 
 from resource_management import *
-
+import socket
+import sys
+import time
 
 def hive_service(
     name,
@@ -36,27 +38,60 @@ def hive_service(
     cmd = format(
       "env JAVA_HOME={java64_home} {start_hiveserver2_path} {hive_log_dir}/hive-server2.out {hive_log_dir}/hive-server2.log {pid_file} {hive_server_conf_dir} {hive_log_dir}")
 
-  is_started = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1")
+  process_id_exists = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1")
   
   if action == 'start':
     demon_cmd = format("{cmd}")
     
     Execute(demon_cmd,
             user=params.hive_user,
-            not_if=is_started
+            not_if=process_id_exists
     )
 
     if params.hive_jdbc_driver == "com.mysql.jdbc.Driver" or \
        params.hive_jdbc_driver == "org.postgresql.Driver" or \
        params.hive_jdbc_driver == "oracle.jdbc.driver.OracleDriver":
+      
       db_connection_check_command = format(
         "{java64_home}/bin/java -cp {check_db_connection_jar}:/usr/share/java/{jdbc_jar_name} org.apache.ambari.server.DBConnectionVerification {hive_jdbc_connection_url} {hive_metastore_user_name} {hive_metastore_user_passwd!p} {hive_jdbc_driver}")
+      
       Execute(db_connection_check_command,
               path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin')
-
+      
+    # AMBARI-5800 - wait for the server to come up instead of just the PID existance
+    if name == 'hiveserver2':
+      SOCKET_WAIT_SECONDS = 120
+      address=params.hive_server_host
+      port=int(params.hive_server_port)
+      
+      start_time = time.time()
+      end_time = start_time + SOCKET_WAIT_SECONDS
+      
+      s = socket.socket()
+      s.settimeout(5)
+            
+      is_service_socket_valid = False
+      print "Waiting for the Hive server to start..."
+      try:
+        while time.time() < end_time:
+          try:
+            s.connect((address, port))
+            is_service_socket_valid = True
+            break
+          except socket.error, e:          
+            time.sleep(5)
+      finally:
+        s.close()
+      
+      elapsed_time = time.time() - start_time    
+      
+      if is_service_socket_valid == False: 
+        raise Fail("Connection to Hive server %s on port %s failed after %d seconds" % (address, port, elapsed_time))
+      
+      print "Successfully connected to Hive at %s on port %s after %d seconds" % (address, port, elapsed_time)    
+            
   elif action == 'stop':
     demon_cmd = format("kill `cat {pid_file}` >/dev/null 2>&1 && rm -f {pid_file}")
     Execute(demon_cmd,
-            not_if = format("! ({is_started})")
+            not_if = format("! ({process_id_exists})")
     )
-

+ 44 - 6
ambari-server/src/test/python/stacks/1.3.2/HIVE/test_hive_server.py

@@ -20,6 +20,8 @@ limitations under the License.
 from mock.mock import MagicMock, call, patch
 from stacks.utils.RMFTestCase import *
 
+import socket
+
 class TestHiveServer(RMFTestCase):
 
   def test_configure_default(self):
@@ -45,7 +47,10 @@ class TestHiveServer(RMFTestCase):
     )
     self.assertNoMoreResources()
   
-  def test_start_default(self):
+  @patch("socket.socket")
+  def test_start_default(self, socket_mock):
+    s = socket_mock.return_value
+    
     self.executeScript("1.3.2/services/HIVE/package/scripts/hive_server.py",
                          classname = "HiveServer",
                          command = "start",
@@ -76,9 +81,12 @@ class TestHiveServer(RMFTestCase):
                               path=['/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin']
     )
 
-    self.assertNoMoreResources()
+    self.assertNoMoreResources()    
+    self.assertTrue(socket_mock.called)
+    self.assertTrue(s.close.called)
 
-  def test_stop_default(self):
+  @patch("socket.socket")
+  def test_stop_default(self, socket_mock):
     self.executeScript("1.3.2/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "stop",
@@ -88,9 +96,10 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', 'kill `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1 && rm -f /var/run/hive/hive-server.pid',
                               not_if = '! (ls /var/run/hive/hive-server.pid >/dev/null 2>&1 && ps `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1)'
     )
+    
     self.assertNoMoreResources()
+    self.assertFalse(socket_mock.called)
 
-    
   def test_configure_secured(self):
     self.executeScript("1.3.2/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
@@ -114,7 +123,10 @@ class TestHiveServer(RMFTestCase):
     )
     self.assertNoMoreResources()
 
-  def test_start_secured(self):
+  @patch("socket.socket")
+  def test_start_secured(self, socket_mock):
+    s = socket_mock.return_value
+    
     self.executeScript("1.3.2/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "start",
@@ -146,8 +158,11 @@ class TestHiveServer(RMFTestCase):
     )
 
     self.assertNoMoreResources()
+    self.assertTrue(socket_mock.called)    
+    self.assertTrue(s.close.called)
 
-  def test_stop_secured(self):
+  @patch("socket.socket")
+  def test_stop_secured(self, socket_mock):
     self.executeScript("1.3.2/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "stop",
@@ -157,7 +172,9 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', 'kill `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1 && rm -f /var/run/hive/hive-server.pid',
                               not_if = '! (ls /var/run/hive/hive-server.pid >/dev/null 2>&1 && ps `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1)'
     )
+    
     self.assertNoMoreResources()
+    self.assertFalse(socket_mock.called)
 
   def assert_configure_default(self):
     self.assertResourceCalled('HdfsDirectory', '/apps/hive/warehouse',
@@ -331,3 +348,24 @@ class TestHiveServer(RMFTestCase):
       group = 'hadoop',
     )
 
+  @patch("time.time")
+  @patch("socket.socket")
+  def test_socket_timeout(self, socket_mock, time_mock):        
+    s = socket_mock.return_value
+    s.connect = MagicMock()    
+    s.connect.side_effect = socket.error("")
+    
+    time_mock.side_effect = [0, 1000, 2000, 3000, 4000]
+    
+    try:
+      self.executeScript("1.3.2/services/HIVE/package/scripts/hive_server.py",
+                           classname = "HiveServer",
+                           command = "start",
+                           config_file="default.json"
+      )
+      
+      self.fail("Script failure due to socket error was expected")
+    except:
+      self.assert_configure_default()
+      self.assertTrue(socket_mock.called)
+      self.assertTrue(s.close.called)    

+ 44 - 4
ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py

@@ -20,6 +20,8 @@ limitations under the License.
 from mock.mock import MagicMock, call, patch
 from stacks.utils.RMFTestCase import *
 
+import socket
+
 class TestHiveServer(RMFTestCase):
 
   def test_configure_default(self):
@@ -45,7 +47,10 @@ class TestHiveServer(RMFTestCase):
     )
     self.assertNoMoreResources()
   
-  def test_start_default(self):
+  @patch("socket.socket")
+  def test_start_default(self, socket_mock):
+    s = socket_mock.return_value
+    
     self.executeScript("2.0.6/services/HIVE/package/scripts/hive_server.py",
                          classname = "HiveServer",
                          command = "start",
@@ -124,8 +129,11 @@ class TestHiveServer(RMFTestCase):
     )
 
     self.assertNoMoreResources()
+    self.assertTrue(socket_mock.called)
+    self.assertTrue(s.close.called)
 
-  def test_stop_default(self):
+  @patch("socket.socket")
+  def test_stop_default(self, socket_mock):
     self.executeScript("2.0.6/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "stop",
@@ -135,7 +143,9 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', 'kill `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1 && rm -f /var/run/hive/hive-server.pid',
                               not_if = '! (ls /var/run/hive/hive-server.pid >/dev/null 2>&1 && ps `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1)'
     )
+    
     self.assertNoMoreResources()
+    self.assertFalse(socket_mock.called)
 
     
   def test_configure_secured(self):
@@ -161,7 +171,10 @@ class TestHiveServer(RMFTestCase):
     )
     self.assertNoMoreResources()
 
-  def test_start_secured(self):
+  @patch("socket.socket")
+  def test_start_secured(self, socket_mock):
+    s = socket_mock.return_value
+    
     self.executeScript("2.0.6/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "start",
@@ -193,8 +206,11 @@ class TestHiveServer(RMFTestCase):
     )
 
     self.assertNoMoreResources()
+    self.assertTrue(socket_mock.called)
+    self.assertTrue(s.close.called)
 
-  def test_stop_secured(self):
+  @patch("socket.socket")
+  def test_stop_secured(self, socket_mock):
     self.executeScript("2.0.6/services/HIVE/package/scripts/hive_server.py",
                        classname = "HiveServer",
                        command = "stop",
@@ -204,7 +220,9 @@ class TestHiveServer(RMFTestCase):
     self.assertResourceCalled('Execute', 'kill `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1 && rm -f /var/run/hive/hive-server.pid',
                               not_if = '! (ls /var/run/hive/hive-server.pid >/dev/null 2>&1 && ps `cat /var/run/hive/hive-server.pid` >/dev/null 2>&1)'
     )
+    
     self.assertNoMoreResources()
+    self.assertFalse(socket_mock.called)
 
   def assert_configure_default(self):
     self.assertResourceCalled('HdfsDirectory', '/apps/hive/warehouse',
@@ -392,3 +410,25 @@ class TestHiveServer(RMFTestCase):
       owner = 'hive',
       group = 'hadoop',
     )
+    
+  @patch("time.time")
+  @patch("socket.socket")
+  def test_socket_timeout(self, socket_mock, time_mock):        
+    s = socket_mock.return_value
+    s.connect = MagicMock()    
+    s.connect.side_effect = socket.error("")
+    
+    time_mock.side_effect = [0, 1000, 2000, 3000, 4000]
+    
+    try:
+      self.executeScript("2.0.6/services/HIVE/package/scripts/hive_server.py",
+                           classname = "HiveServer",
+                           command = "start",
+                           config_file="default.json"
+      )
+      
+      self.fail("Script failure due to socket error was expected")
+    except:
+      self.assert_configure_default()
+      self.assertTrue(socket_mock.called)
+      self.assertTrue(s.close.called)