|
@@ -28,15 +28,12 @@ import tempfile
|
|
|
import time
|
|
|
from threading import Thread
|
|
|
|
|
|
-from PythonExecutor import PythonExecutor
|
|
|
-from CustomServiceOrchestrator import CustomServiceOrchestrator
|
|
|
-from FileCache import FileCache
|
|
|
+from FileCache import FileCache, CachingException
|
|
|
from AmbariConfig import AmbariConfig
|
|
|
from mock.mock import MagicMock, patch
|
|
|
import StringIO
|
|
|
import sys
|
|
|
-from ambari_agent import AgentException
|
|
|
-from AgentException import AgentException
|
|
|
+import shutil
|
|
|
|
|
|
|
|
|
class TestFileCache(TestCase):
|
|
@@ -51,56 +48,314 @@ class TestFileCache(TestCase):
|
|
|
self.config.add_section('agent')
|
|
|
self.config.set('agent', 'prefix', tmpdir)
|
|
|
self.config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
|
|
|
+ self.config.set('agent', 'tolerate_download_failures', "true")
|
|
|
|
|
|
|
|
|
- @patch("os.path.isdir")
|
|
|
- def test_get_service_base_dir(self, isdir_mock):
|
|
|
+ def test_reset(self):
|
|
|
fileCache = FileCache(self.config)
|
|
|
- # Check existing dir case
|
|
|
- isdir_mock.return_value = True
|
|
|
- service_subpath = "HDP/2.1.1/services/ZOOKEEPER/package"
|
|
|
- base = fileCache.get_service_base_dir(service_subpath)
|
|
|
- self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.1.1/"
|
|
|
- "services/ZOOKEEPER/package")
|
|
|
- # Check absent dir case
|
|
|
- isdir_mock.return_value = False
|
|
|
- try:
|
|
|
- fileCache.get_service_base_dir(service_subpath)
|
|
|
- self.fail("Should throw an exception")
|
|
|
- except AgentException:
|
|
|
- pass # Expected
|
|
|
+ fileCache.uptodate_paths.append('dummy-path')
|
|
|
+ fileCache.reset()
|
|
|
+ self.assertFalse(fileCache.uptodate_paths)
|
|
|
|
|
|
|
|
|
+ @patch.object(FileCache, "provide_directory")
|
|
|
+ def test_get_service_base_dir(self, provide_directory_mock):
|
|
|
+ provide_directory_mock.return_value = "dummy value"
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ command = {
|
|
|
+ 'commandParams' : {
|
|
|
+ 'service_package_folder' : 'HDP/2.1.1/services/ZOOKEEPER/package'
|
|
|
+ }
|
|
|
+ }
|
|
|
+ res = fileCache.get_service_base_dir(command, "server_url_pref")
|
|
|
+ self.assertEquals(
|
|
|
+ pprint.pformat(provide_directory_mock.call_args_list[0][0]),
|
|
|
+ "('/var/lib/ambari-agent/cache',\n "
|
|
|
+ "'stacks/HDP/2.1.1/services/ZOOKEEPER/package',\n"
|
|
|
+ " 'server_url_pref')")
|
|
|
+ self.assertEquals(res, "dummy value")
|
|
|
|
|
|
|
|
|
- @patch("os.path.isdir")
|
|
|
- def test_get_hook_base_dir(self, isdir_mock):
|
|
|
+ @patch.object(FileCache, "provide_directory")
|
|
|
+ def test_get_hook_base_dir(self, provide_directory_mock):
|
|
|
fileCache = FileCache(self.config)
|
|
|
# Check missing parameter
|
|
|
command = {
|
|
|
'commandParams' : {
|
|
|
}
|
|
|
}
|
|
|
- base = fileCache.get_hook_base_dir(command)
|
|
|
+ base = fileCache.get_hook_base_dir(command, "server_url_pref")
|
|
|
self.assertEqual(base, None)
|
|
|
+ self.assertFalse(provide_directory_mock.called)
|
|
|
|
|
|
# Check existing dir case
|
|
|
- isdir_mock.return_value = True
|
|
|
command = {
|
|
|
'commandParams' : {
|
|
|
'hooks_folder' : 'HDP/2.1.1/hooks'
|
|
|
}
|
|
|
}
|
|
|
- base = fileCache.get_hook_base_dir(command)
|
|
|
- self.assertEqual(base, "/var/lib/ambari-agent/cache/stacks/HDP/2.1.1/hooks")
|
|
|
+ provide_directory_mock.return_value = "dummy value"
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ res = fileCache.get_hook_base_dir(command, "server_url_pref")
|
|
|
+ self.assertEquals(
|
|
|
+ pprint.pformat(provide_directory_mock.call_args_list[0][0]),
|
|
|
+ "('/var/lib/ambari-agent/cache', "
|
|
|
+ "'stacks/HDP/2.1.1/hooks', "
|
|
|
+ "'server_url_pref')")
|
|
|
+ self.assertEquals(res, "dummy value")
|
|
|
+
|
|
|
+
|
|
|
+ @patch.object(FileCache, "provide_directory")
|
|
|
+ def test_get_custom_actions_base_dir(self, provide_directory_mock):
|
|
|
+ provide_directory_mock.return_value = "dummy value"
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ res = fileCache.get_custom_actions_base_dir("server_url_pref")
|
|
|
+ self.assertEquals(
|
|
|
+ pprint.pformat(provide_directory_mock.call_args_list[0][0]),
|
|
|
+ "('/var/lib/ambari-agent/cache', 'custom_actions', 'server_url_pref')")
|
|
|
+ self.assertEquals(res, "dummy value")
|
|
|
+
|
|
|
+
|
|
|
+ @patch.object(FileCache, "build_download_url")
|
|
|
+ @patch.object(FileCache, "fetch_url")
|
|
|
+ @patch.object(FileCache, "read_hash_sum")
|
|
|
+ @patch.object(FileCache, "invalidate_directory")
|
|
|
+ @patch.object(FileCache, "unpack_archive")
|
|
|
+ @patch.object(FileCache, "write_hash_sum")
|
|
|
+ def test_provide_directory(self, write_hash_sum_mock, unpack_archive_mock,
|
|
|
+ invalidate_directory_mock,
|
|
|
+ read_hash_sum_mock, fetch_url_mock,
|
|
|
+ build_download_url_mock):
|
|
|
+ build_download_url_mock.return_value = "http://dummy-url/"
|
|
|
+ HASH1 = "hash1"
|
|
|
+ membuffer = MagicMock()
|
|
|
+ membuffer.getvalue.return_value.strip.return_value = HASH1
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+
|
|
|
+ # Test uptodate dirs after start
|
|
|
+ self.assertFalse(fileCache.uptodate_paths)
|
|
|
+
|
|
|
+ # Test initial downloading (when dir does not exist)
|
|
|
+ fetch_url_mock.return_value = membuffer
|
|
|
+ read_hash_sum_mock.return_value = "hash2"
|
|
|
+ res = fileCache.provide_directory("cache_path", "subdirectory",
|
|
|
+ "server_url_prefix")
|
|
|
+ self.assertTrue(invalidate_directory_mock.called)
|
|
|
+ self.assertTrue(write_hash_sum_mock.called)
|
|
|
+ self.assertEquals(fetch_url_mock.call_count, 2)
|
|
|
+ self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
|
|
|
+ "['cache_path/subdirectory']")
|
|
|
+ self.assertEquals(res, 'cache_path/subdirectory')
|
|
|
+
|
|
|
+ fetch_url_mock.reset_mock()
|
|
|
+ write_hash_sum_mock.reset_mock()
|
|
|
+ invalidate_directory_mock.reset_mock()
|
|
|
+ unpack_archive_mock.reset_mock()
|
|
|
+
|
|
|
+ # Test cache invalidation when local hash does not differ
|
|
|
+ fetch_url_mock.return_value = membuffer
|
|
|
+ read_hash_sum_mock.return_value = HASH1
|
|
|
+ fileCache.reset()
|
|
|
+
|
|
|
+ res = fileCache.provide_directory("cache_path", "subdirectory",
|
|
|
+ "server_url_prefix")
|
|
|
+ self.assertFalse(invalidate_directory_mock.called)
|
|
|
+ self.assertFalse(write_hash_sum_mock.called)
|
|
|
+ self.assertEquals(fetch_url_mock.call_count, 1)
|
|
|
+ self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
|
|
|
+ "['cache_path/subdirectory']")
|
|
|
+ self.assertEquals(res, 'cache_path/subdirectory')
|
|
|
+
|
|
|
+ fetch_url_mock.reset_mock()
|
|
|
+ write_hash_sum_mock.reset_mock()
|
|
|
+ invalidate_directory_mock.reset_mock()
|
|
|
+ unpack_archive_mock.reset_mock()
|
|
|
+
|
|
|
+ # Test execution path when path is up-to date (already checked)
|
|
|
+ res = fileCache.provide_directory("cache_path", "subdirectory",
|
|
|
+ "server_url_prefix")
|
|
|
+ self.assertFalse(invalidate_directory_mock.called)
|
|
|
+ self.assertFalse(write_hash_sum_mock.called)
|
|
|
+ self.assertEquals(fetch_url_mock.call_count, 0)
|
|
|
+ self.assertEquals(pprint.pformat(fileCache.uptodate_paths),
|
|
|
+ "['cache_path/subdirectory']")
|
|
|
+ self.assertEquals(res, 'cache_path/subdirectory')
|
|
|
+
|
|
|
+ # Check exception handling when tolerance is disabled
|
|
|
+ self.config.set('agent', 'tolerate_download_failures', "false")
|
|
|
+ fetch_url_mock.side_effect = self.caching_exc_side_effect
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ try:
|
|
|
+ fileCache.provide_directory("cache_path", "subdirectory",
|
|
|
+ "server_url_prefix")
|
|
|
+ self.fail('CachingException not thrown')
|
|
|
+ except CachingException:
|
|
|
+ pass # Expected
|
|
|
+ except Exception, e:
|
|
|
+ self.fail('Unexpected exception thrown:' + str(e))
|
|
|
+
|
|
|
+ # Check that unexpected exceptions are still propagated when
|
|
|
+ # tolerance is enabled
|
|
|
+ self.config.set('agent', 'tolerate_download_failures', "false")
|
|
|
+ fetch_url_mock.side_effect = self.exc_side_effect
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ try:
|
|
|
+ fileCache.provide_directory("cache_path", "subdirectory",
|
|
|
+ "server_url_prefix")
|
|
|
+ self.fail('Exception not thrown')
|
|
|
+ except Exception:
|
|
|
+ pass # Expected
|
|
|
|
|
|
- # Check absent dir case
|
|
|
+
|
|
|
+ # Check exception handling when tolerance is enabled
|
|
|
+ self.config.set('agent', 'tolerate_download_failures', "true")
|
|
|
+ fetch_url_mock.side_effect = self.caching_exc_side_effect
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ res = fileCache.provide_directory("cache_path", "subdirectory",
|
|
|
+ "server_url_prefix")
|
|
|
+ self.assertEquals(res, 'cache_path/subdirectory')
|
|
|
+
|
|
|
+
|
|
|
+ def test_build_download_url(self):
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ url = fileCache.build_download_url('http://localhost:8080/resources/',
|
|
|
+ 'stacks/HDP/2.1.1/hooks', 'archive.zip')
|
|
|
+ self.assertEqual(url,
|
|
|
+ 'http://localhost:8080/resources//stacks/HDP/2.1.1/hooks/archive.zip')
|
|
|
+
|
|
|
+
|
|
|
+ @patch("urllib2.urlopen")
|
|
|
+ def test_fetch_url(self, urlopen_mock):
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ remote_url = "http://dummy-url/"
|
|
|
+ # Test normal download
|
|
|
+ test_str = 'abc' * 100000 # Very long string
|
|
|
+ test_string_io = StringIO.StringIO(test_str)
|
|
|
+ test_buffer = MagicMock()
|
|
|
+ test_buffer.read.side_effect = test_string_io.read
|
|
|
+ urlopen_mock.return_value = test_buffer
|
|
|
+
|
|
|
+ memory_buffer = fileCache.fetch_url(remote_url)
|
|
|
+
|
|
|
+ self.assertEquals(memory_buffer.getvalue(), test_str)
|
|
|
+ self.assertEqual(test_buffer.read.call_count, 20) # depends on buffer size
|
|
|
+ # Test exception handling
|
|
|
+ test_buffer.read.side_effect = self.exc_side_effect
|
|
|
+ try:
|
|
|
+ fileCache.fetch_url(remote_url)
|
|
|
+ self.fail('CachingException not thrown')
|
|
|
+ except CachingException:
|
|
|
+ pass # Expected
|
|
|
+ except Exception, e:
|
|
|
+ self.fail('Unexpected exception thrown:' + str(e))
|
|
|
+
|
|
|
+
|
|
|
+ def test_read_write_hash_sum(self):
|
|
|
+ tmpdir = tempfile.mkdtemp()
|
|
|
+ dummyhash = "DUMMY_HASH"
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ fileCache.write_hash_sum(tmpdir, dummyhash)
|
|
|
+ newhash = fileCache.read_hash_sum(tmpdir)
|
|
|
+ self.assertEquals(newhash, dummyhash)
|
|
|
+ shutil.rmtree(tmpdir)
|
|
|
+ # Test read of not existing file
|
|
|
+ newhash = fileCache.read_hash_sum(tmpdir)
|
|
|
+ self.assertEquals(newhash, None)
|
|
|
+ # Test write to not existing file
|
|
|
+ with patch("__builtin__.open") as open_mock:
|
|
|
+ open_mock.side_effect = self.exc_side_effect
|
|
|
+ try:
|
|
|
+ fileCache.write_hash_sum(tmpdir, dummyhash)
|
|
|
+ self.fail('CachingException not thrown')
|
|
|
+ except CachingException:
|
|
|
+ pass # Expected
|
|
|
+ except Exception, e:
|
|
|
+ self.fail('Unexpected exception thrown:' + str(e))
|
|
|
+
|
|
|
+
|
|
|
+ @patch("os.path.isfile")
|
|
|
+ @patch("os.path.isdir")
|
|
|
+ @patch("os.unlink")
|
|
|
+ @patch("shutil.rmtree")
|
|
|
+ @patch("os.makedirs")
|
|
|
+ def test_invalidate_directory(self, makedirs_mock, rmtree_mock,
|
|
|
+ unlink_mock, isdir_mock, isfile_mock):
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ # Test execution flow if path points to file
|
|
|
+ isfile_mock.return_value = True
|
|
|
isdir_mock.return_value = False
|
|
|
+
|
|
|
+ fileCache.invalidate_directory("dummy-dir")
|
|
|
+
|
|
|
+ self.assertTrue(unlink_mock.called)
|
|
|
+ self.assertFalse(rmtree_mock.called)
|
|
|
+ self.assertTrue(makedirs_mock.called)
|
|
|
+
|
|
|
+ unlink_mock.reset_mock()
|
|
|
+ rmtree_mock.reset_mock()
|
|
|
+ makedirs_mock.reset_mock()
|
|
|
+
|
|
|
+ # Test execution flow if path points to dir
|
|
|
+ isfile_mock.return_value = False
|
|
|
+ isdir_mock.return_value = True
|
|
|
+
|
|
|
+ fileCache.invalidate_directory("dummy-dir")
|
|
|
+
|
|
|
+ self.assertFalse(unlink_mock.called)
|
|
|
+ self.assertTrue(rmtree_mock.called)
|
|
|
+ self.assertTrue(makedirs_mock.called)
|
|
|
+
|
|
|
+ unlink_mock.reset_mock()
|
|
|
+ rmtree_mock.reset_mock()
|
|
|
+ makedirs_mock.reset_mock()
|
|
|
+
|
|
|
+ # Test exception handling
|
|
|
+ makedirs_mock.side_effect = self.exc_side_effect
|
|
|
try:
|
|
|
- fileCache.get_hook_base_dir(command)
|
|
|
- self.fail("Should throw an exception")
|
|
|
- except AgentException:
|
|
|
+ fileCache.invalidate_directory("dummy-dir")
|
|
|
+ self.fail('CachingException not thrown')
|
|
|
+ except CachingException:
|
|
|
pass # Expected
|
|
|
+ except Exception, e:
|
|
|
+ self.fail('Unexpected exception thrown:' + str(e))
|
|
|
+
|
|
|
+
|
|
|
+ def test_unpack_archive(self):
|
|
|
+ tmpdir = tempfile.mkdtemp()
|
|
|
+ dummy_archive = os.path.join("ambari_agent", "dummy_files",
|
|
|
+ "dummy_archive.zip")
|
|
|
+ # Test normal flow
|
|
|
+ with open(dummy_archive, "r") as f:
|
|
|
+ data = f.read(os.path.getsize(dummy_archive))
|
|
|
+ membuf = StringIO.StringIO(data)
|
|
|
+
|
|
|
+ fileCache = FileCache(self.config)
|
|
|
+ fileCache.unpack_archive(membuf, tmpdir)
|
|
|
+ # Count summary size of unpacked files:
|
|
|
+ total_size = 0
|
|
|
+ total_files = 0
|
|
|
+ total_dirs = 0
|
|
|
+ for dirpath, dirnames, filenames in os.walk(tmpdir):
|
|
|
+ total_dirs += 1
|
|
|
+ for f in filenames:
|
|
|
+ fp = os.path.join(dirpath, f)
|
|
|
+ total_size += os.path.getsize(fp)
|
|
|
+ total_files += 1
|
|
|
+ self.assertEquals(total_size, 51258L)
|
|
|
+ self.assertEquals(total_files, 28)
|
|
|
+ self.assertEquals(total_dirs, 8)
|
|
|
+ shutil.rmtree(tmpdir)
|
|
|
+
|
|
|
+ # Test exception handling
|
|
|
+ with patch("os.path.isdir") as isdir_mock:
|
|
|
+ isdir_mock.side_effect = self.exc_side_effect
|
|
|
+ try:
|
|
|
+ fileCache.unpack_archive(membuf, tmpdir)
|
|
|
+ self.fail('CachingException not thrown')
|
|
|
+ except CachingException:
|
|
|
+ pass # Expected
|
|
|
+ except Exception, e:
|
|
|
+ self.fail('Unexpected exception thrown:' + str(e))
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
@@ -108,3 +363,8 @@ class TestFileCache(TestCase):
|
|
|
sys.stdout = sys.__stdout__
|
|
|
|
|
|
|
|
|
+ def exc_side_effect(self, *a):
|
|
|
+ raise Exception("horrible_exc")
|
|
|
+
|
|
|
+ def caching_exc_side_effect(self, *a):
|
|
|
+ raise CachingException("horrible_caching_exc")
|