|
@@ -0,0 +1,223 @@
|
|
|
+#Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+#or more contributor license agreements. See the NOTICE file
|
|
|
+#distributed with this work for additional information
|
|
|
+#regarding copyright ownership. The ASF licenses this file
|
|
|
+#to you under the Apache License, Version 2.0 (the
|
|
|
+#"License"); you may not use this file except in compliance
|
|
|
+#with the License. You may obtain a copy of the License at
|
|
|
+
|
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+
|
|
|
+#Unless required by applicable law or agreed to in writing, software
|
|
|
+#distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+#See the License for the specific language governing permissions and
|
|
|
+#limitations under the License.
|
|
|
+import unittest, getpass, os, sys, re, threading, time
|
|
|
+
|
|
|
+myDirectory = os.path.realpath(sys.argv[0])
|
|
|
+rootDirectory = re.sub("/testing/.*", "", myDirectory)
|
|
|
+
|
|
|
+sys.path.append(rootDirectory)
|
|
|
+
|
|
|
+from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
|
|
|
+from hodlib.Hod.hod import hodRunner, hodState
|
|
|
+from hodlib.Common.desc import NodePoolDesc
|
|
|
+
|
|
|
+excludes = []
|
|
|
+
|
|
|
+# Information about all clusters is written to a file called clusters.state.
|
|
|
+TEST_CLUSTER_DATA_FILE='clusters'
|
|
|
+
|
|
|
+# Temp directory prefix
|
|
|
+TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
|
|
|
+
|
|
|
+# Test class that defines methods to test invalid arguments to hod operations.
|
|
|
+class test_InvalidArgsOperations(unittest.TestCase):
|
|
|
+
|
|
|
+ # build a config object with all required keys for initializing hod.
|
|
|
+ def setupConf():
|
|
|
+ cfg = {
|
|
|
+ 'hod' : {
|
|
|
+ 'original-dir' : os.getcwd(),
|
|
|
+ 'stream' : True,
|
|
|
+ # store all the info about clusters in this directory
|
|
|
+ 'user_state' : '/tmp/hodtest',
|
|
|
+ 'debug' : 3,
|
|
|
+ 'java-home' : os.getenv('JAVA_HOME'),
|
|
|
+ 'cluster' : 'dummy',
|
|
|
+ 'cluster-factor' : 1.8,
|
|
|
+ 'xrs-port-range' : (32768,65536),
|
|
|
+ 'allocate-wait-time' : 3600,
|
|
|
+ 'temp-dir' : '/tmp/hod'
|
|
|
+ },
|
|
|
+ # just set everything to dummy. Need something to initialize the
|
|
|
+ # node pool description object.
|
|
|
+ 'resource_manager' : {
|
|
|
+ 'id' : 'dummy',
|
|
|
+ 'batch-home' : 'dummy',
|
|
|
+ 'queue' : 'dummy',
|
|
|
+ }
|
|
|
+ }
|
|
|
+ cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
|
|
|
+ return cfg
|
|
|
+
|
|
|
+ setupConf = staticmethod(setupConf)
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+
|
|
|
+ self.cfg = test_InvalidArgsOperations.setupConf()
|
|
|
+ # initialize the mock objects
|
|
|
+ self.log = MockLogger()
|
|
|
+ self.cluster = MockHadoopCluster()
|
|
|
+
|
|
|
+ # Use the test logger. This will be used for test verification.
|
|
|
+ self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
|
|
|
+ # Create the hodState object to set the test state you want.
|
|
|
+ self.state = hodState(self.cfg['hod']['user_state'])
|
|
|
+ if not os.path.exists(self.cfg['hod']['user_state']):
|
|
|
+ os.path.mkdir(self.cfg['hod']['user_state'])
|
|
|
+ p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
|
|
|
+ # ensure cluster data file exists, so write works in the tests.
|
|
|
+ f = open(p, 'w')
|
|
|
+ f.close()
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ # clean up cluster data file and directory
|
|
|
+ p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
|
|
|
+ os.remove(p)
|
|
|
+ os.rmdir(self.cfg['hod']['user_state'])
|
|
|
+
|
|
|
+ # Test that list works with deleted cluster directories - more than one entries which are invalid.
|
|
|
+ def testListInvalidDirectory(self):
|
|
|
+ userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1',
|
|
|
+ os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' }
|
|
|
+ self.__setupClusterState(userState)
|
|
|
+ self.client._op_list(['list'])
|
|
|
+ # assert that required errors are logged.
|
|
|
+ for clusterDir in userState.keys():
|
|
|
+ self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
|
|
|
+ % (userState[clusterDir], clusterDir), 'info'))
|
|
|
+
|
|
|
+ # simulate a test where a directory is deleted, and created again, without deallocation
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory')
|
|
|
+ os.makedirs(clusterDir)
|
|
|
+ self.assertTrue(os.path.isdir(clusterDir))
|
|
|
+ userState = { clusterDir : '123.dummy.id3' }
|
|
|
+ self.__setupClusterState(userState, False)
|
|
|
+ self.client._op_list(['list'])
|
|
|
+ self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
|
|
|
+ % (userState[clusterDir], clusterDir), 'info'))
|
|
|
+ os.rmdir(clusterDir)
|
|
|
+
|
|
|
+ # Test that info works with a deleted cluster directory
|
|
|
+ def testInfoInvalidDirectory(self):
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory')
|
|
|
+ userState = { clusterDir : '456.dummy.id' }
|
|
|
+ self.__setupClusterState(userState)
|
|
|
+ self.client._op_info(['info', clusterDir])
|
|
|
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
|
|
|
+
|
|
|
+ # simulate a test where a directory is deleted, and created again, without deallocation
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory')
|
|
|
+ os.makedirs(clusterDir)
|
|
|
+ self.assertTrue(os.path.isdir(clusterDir))
|
|
|
+ userState = { clusterDir : '456.dummy.id1' }
|
|
|
+ self.__setupClusterState(userState, False)
|
|
|
+ self.client._op_info(['info', clusterDir])
|
|
|
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
|
|
|
+ os.rmdir(clusterDir)
|
|
|
+
|
|
|
+ # Test info works with an invalid cluster directory
|
|
|
+ def testInfoNonExistentDirectory(self):
|
|
|
+ clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
|
|
|
+ self.client._op_info(['info', clusterDir])
|
|
|
+ self.assertTrue(self.log.hasMessage("'%s' is not a valid cluster directory." % (clusterDir), 'critical'))
|
|
|
+
|
|
|
+ # Test that deallocation works on a deleted cluster directory
|
|
|
+ # by clearing the job, and removing the state
|
|
|
+ def testDeallocateInvalidDirectory(self):
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory')
|
|
|
+ jobid = '789.dummy.id'
|
|
|
+ userState = { clusterDir : jobid }
|
|
|
+ self.__setupClusterState(userState)
|
|
|
+ self.client._op_deallocate(['deallocate', clusterDir])
|
|
|
+ # verify job was deleted
|
|
|
+ self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
|
|
|
+ # verify appropriate message was logged.
|
|
|
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
|
|
|
+ self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
|
|
|
+ # verify that the state information was cleared.
|
|
|
+ userState = self.state.read(TEST_CLUSTER_DATA_FILE)
|
|
|
+ self.assertFalse(clusterDir in userState.keys())
|
|
|
+
|
|
|
+ # simulate a test where a directory is deleted, and created again, without deallocation
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory')
|
|
|
+ os.makedirs(clusterDir)
|
|
|
+ self.assertTrue(os.path.isdir(clusterDir))
|
|
|
+ jobid = '789.dummy.id1'
|
|
|
+ userState = { clusterDir : jobid }
|
|
|
+ self.__setupClusterState(userState, False)
|
|
|
+ self.client._op_deallocate(['deallocate', clusterDir])
|
|
|
+ # verify job was deleted
|
|
|
+ self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
|
|
|
+ # verify appropriate message was logged.
|
|
|
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
|
|
|
+ self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
|
|
|
+ # verify that the state information was cleared.
|
|
|
+ userState = self.state.read(TEST_CLUSTER_DATA_FILE)
|
|
|
+ self.assertFalse(clusterDir in userState.keys())
|
|
|
+ os.rmdir(clusterDir)
|
|
|
+
|
|
|
+ # Test that deallocation works on a nonexistent directory.
|
|
|
+ def testDeallocateNonExistentDirectory(self):
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory')
|
|
|
+ self.client._op_deallocate(['deallocate', clusterDir])
|
|
|
+ # there should be no call..
|
|
|
+ self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
|
|
|
+ self.assertTrue(self.log.hasMessage("'%s' is not a valid cluster directory." % (clusterDir), 'critical'))
|
|
|
+
|
|
|
+ # Test that allocation on an previously deleted directory fails.
|
|
|
+ def testAllocateOnDeletedDirectory(self):
|
|
|
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory')
|
|
|
+ os.makedirs(clusterDir)
|
|
|
+ self.assertTrue(os.path.isdir(clusterDir))
|
|
|
+ jobid = '1234.abc.com'
|
|
|
+ userState = { clusterDir : jobid }
|
|
|
+ self.__setupClusterState(userState, False)
|
|
|
+ self.client._op_allocate(['allocate', clusterDir, '3'])
|
|
|
+ self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir), 'critical'))
|
|
|
+ os.rmdir(clusterDir)
|
|
|
+
|
|
|
+ def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
|
|
|
+ for clusterDir in clusterStateMap.keys():
|
|
|
+ # ensure directory doesn't exist, just in case.
|
|
|
+ if verifyDirIsAbsent:
|
|
|
+ self.assertFalse(os.path.exists(clusterDir))
|
|
|
+ # set up required state.
|
|
|
+ self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap)
|
|
|
+ # verify everything is stored correctly.
|
|
|
+ state = self.state.read(TEST_CLUSTER_DATA_FILE)
|
|
|
+ for clusterDir in clusterStateMap.keys():
|
|
|
+ self.assertTrue(clusterDir in state.keys())
|
|
|
+ self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
|
|
|
+
|
|
|
+class HodTestSuite(BaseTestSuite):
|
|
|
+ def __init__(self):
|
|
|
+ # suite setup
|
|
|
+ BaseTestSuite.__init__(self, __name__, excludes)
|
|
|
+ pass
|
|
|
+
|
|
|
+ def cleanUp(self):
|
|
|
+ # suite tearDown
|
|
|
+ pass
|
|
|
+
|
|
|
+def RunHodTests():
|
|
|
+ # modulename_suite
|
|
|
+ suite = HodTestSuite()
|
|
|
+ testResult = suite.runTests()
|
|
|
+ suite.cleanUp()
|
|
|
+ return testResult
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ RunHodTests()
|