瀏覽代碼

Merge -r 647089:647090 from trunk onto 0.17 branch. Fixes HADOOP-3153.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@647138 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父節點
當前提交
d587f180e3
共有 3 個文件被更改,包括 212 次插入51 次删除
  1. 5 1
      src/contrib/hod/CHANGES.txt
  2. 91 18
      src/contrib/hod/hodlib/Hod/hod.py
  3. 116 32
      src/contrib/hod/testing/testHod.py

+ 5 - 1
src/contrib/hod/CHANGES.txt

@@ -1,7 +1,7 @@
 HOD Change Log
 
 
-Trunk (unreleased changes)
+Release 0.17.0 - Unreleased
 
   INCOMPATIBLE CHANGES
 
@@ -50,6 +50,10 @@ Trunk (unreleased changes)
     directory, script file and other options.
     (Vinod Kumar Vavilapalli via yhemanth)
 
+    HADOOP-3153. Fixes the way HOD handles allocation if the user has no
+    permissions to update the clusters state file.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
 Release 0.16.2 - 2008-04-02
 
   BUG FIXES

+ 91 - 18
src/contrib/hod/hodlib/Hod/hod.py

@@ -15,7 +15,7 @@
 #limitations under the License.
 # -*- python -*-
 
-import sys, os, getpass, pprint, re, cPickle, random, shutil, time
+import sys, os, getpass, pprint, re, cPickle, random, shutil, time, errno
 
 import hodlib.Common.logger
 
@@ -30,6 +30,23 @@ from hodlib.Common.nodepoolutil import NodePoolUtil
 from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
 
 CLUSTER_DATA_FILE = 'clusters'
+INVALID_STATE_FILE_MSGS = \
+              [
+
+                "Requested operation cannot be performed. Cannot read %s: " + \
+                "Permission denied.",
+
+                "Requested operation cannot be performed. " + \
+                "Cannot write to %s: Permission denied.",
+
+                "Requested operation cannot be performed. " + \
+                "Cannot read/write to %s: Permission denied.",
+
+                "Cannot update %s: Permission denied. " + \
+                "Cluster is deallocated, but info and list " + \
+                "operations might show incorrect information.",
+
+              ]
 
 class hodState:
   def __init__(self, store):
@@ -50,7 +67,28 @@ class hodState:
       for item in os.listdir(self.__store):
         if item.endswith(self.__STORE_EXT):  
           self.__stateFile = os.path.join(self.__store, item)          
+
+  def get_state_file(self):
+    return self.__stateFile
           
+  def checkStateFile(self, id=None, modes=(os.R_OK,)):
+    # is state file exists/readable/writable/both?
+    self.__set_state_file(id)
+
+    # return true if file doesn't exist, because HOD CAN create
+    # state file and so WILL have permissions to read and/or write
+    try:
+      os.stat(self.__stateFile)
+    except OSError, err:
+      if err.errno == errno.ENOENT: # error 2 (no such file)
+        return True
+
+    # file exists
+    ret = True
+    for mode in modes:
+      ret = ret and os.access(self.__stateFile, mode)
+    return ret
+
   def read(self, id=None):
     info = {}
     
@@ -72,7 +110,7 @@ class hodState:
     self.__set_state_file(id)
     if not os.path.exists(self.__stateFile):
       self.clear(id)
-      
+ 
     stateFile = open(self.__stateFile, 'w')
     cPickle.dump(info, stateFile)
     stateFile.close()
@@ -215,6 +253,13 @@ class hodRunner:
         self.__opCode = 3
         return
 
+      if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, \
+                                              (os.R_OK, os.W_OK)):
+        self.__log.critical(INVALID_STATE_FILE_MSGS[2] % \
+                         self.__userState.get_state_file())
+        self.__opCode = 1
+        return
+
       clusterList = self.__userState.read(CLUSTER_DATA_FILE)
       if clusterDir in clusterList.keys():
         self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
@@ -255,21 +300,28 @@ class hodRunner:
             # Allocation has gone through.
             # Don't care about interrupts any more
 
-            if allocateStatus == 0:
-              self.__set_cluster_state_info(os.environ, 
-                                            self.__cluster.hdfsInfo, 
-                                            self.__cluster.mapredInfo, 
-                                            self.__cluster.ringmasterXRS,
-                                            self.__cluster.jobId,
-                                            min, max)
-              self.__setup_cluster_state(clusterDir)
-              self.__clusterState.write(self.__cluster.jobId, 
-                                        self.__clusterStateInfo)
-              #  Do we need to check for interrupts here ??
-
-              self.__set_user_state_info( 
-                { clusterDir : self.__cluster.jobId, } )
-            self.__opCode = allocateStatus
+            try:
+              if allocateStatus == 0:
+                self.__set_cluster_state_info(os.environ, 
+                                              self.__cluster.hdfsInfo, 
+                                              self.__cluster.mapredInfo, 
+                                              self.__cluster.ringmasterXRS,
+                                              self.__cluster.jobId,
+                                              min, max)
+                self.__setup_cluster_state(clusterDir)
+                self.__clusterState.write(self.__cluster.jobId, 
+                                          self.__clusterStateInfo)
+                #  Do we need to check for interrupts here ??
+  
+                self.__set_user_state_info( 
+                  { clusterDir : self.__cluster.jobId, } )
+              self.__opCode = allocateStatus
+            except Exception, e:
+              # Some unknown problem.
+              self.__cleanup()
+              self.__cluster.deallocate(clusterDir, self.__clusterStateInfo)
+              self.__opCode = 1
+              raise Exception(e)
           elif self.__opCode == 12:
             self.__log.critical("Cluster %s already allocated." % clusterDir)
           elif self.__opCode == 10:
@@ -314,6 +366,11 @@ class hodRunner:
           # irrespective of whether deallocate failed or not\
           # remove the cluster state.
           self.__clusterState.clear()
+          if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
+            self.__log.critical(INVALID_STATE_FILE_MSGS[3] % \
+                               self.__userState.get_state_file())
+            self.__opCode = 1
+            return
           self.__remove_cluster(clusterDir)
       else:
         self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
@@ -385,14 +442,25 @@ class hodRunner:
       self.__opCode = 3      
 
   def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
+    if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
+      self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \
+                           self.__userState.get_state_file())
+      self.__opCode = 1
+      return
+
     clusterList = self.__userState.read(CLUSTER_DATA_FILE)
     if clusterDir in clusterList.keys():
       # previously allocated cluster.
       self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
       if cleanUp:
         self.__cluster.delete_job(clusterList[clusterDir])
-        self.__remove_cluster(clusterDir)
         self.__log.critical("Freeing resources allocated to the cluster.")
+        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
+          self.__log.critical(INVALID_STATE_FILE_MSGS[1] % \
+                              self.__userState.get_state_file())
+          self.__opCode = 1
+          return
+        self.__remove_cluster(clusterDir)
       self.__opCode = 3
     else:
       self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir))
@@ -434,6 +502,11 @@ class hodRunner:
     try:
       opList = self.__check_operation(operation)
       if self.__opCode == 0:
+        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
+           self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \
+                         self.__userState.get_state_file())
+           self.__opCode = 1
+           return self.__opCode
         getattr(self, "_op_%s" % opList[0])(opList)
     except HodInterruptException, h:
       self.__log.critical("op: %s failed because of a process interrupt." \

+ 116 - 32
src/contrib/hod/testing/testHod.py

@@ -20,6 +20,7 @@ rootDirectory   = re.sub("/testing/.*", "", myDirectory)
 
 sys.path.append(rootDirectory)
 
+import tempfile
 from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
 from hodlib.Hod.hod import hodRunner, hodState
 from hodlib.Common.desc import NodePoolDesc
@@ -27,46 +28,44 @@ from hodlib.Common.desc import NodePoolDesc
 excludes = []
 
 # Information about all clusters is written to a file called clusters.state.
-TEST_CLUSTER_DATA_FILE='clusters'
+from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, \
+                           INVALID_STATE_FILE_MSGS
 
 # Temp directory prefix
 TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
 
+# build a config object with all required keys for initializing hod.
+def setupConf():
+  cfg = {
+          'hod' : {
+                    'original-dir' : os.getcwd(),
+                    'stream' : True,
+                    # store all the info about clusters in this directory
+                    'user_state' : '/tmp/hodtest',
+                    'debug' : 3,
+                    'java-home' : os.getenv('JAVA_HOME'),
+                    'cluster' : 'dummy',
+                    'cluster-factor' : 1.8,
+                    'xrs-port-range' : (32768,65536),
+                    'allocate-wait-time' : 3600,
+                    'temp-dir' : '/tmp/hod'
+                  },
+          # just set everything to dummy. Need something to initialize the
+          # node pool description object.
+          'resource_manager' : {
+                                 'id' : 'dummy',
+                                 'batch-home' : 'dummy',
+                                 'queue' : 'dummy',
+                               }
+        }
+  cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
+  return cfg
+
 # Test class that defines methods to test invalid arguments to hod operations.
 class test_InvalidArgsOperations(unittest.TestCase):
-
-  # build a config object with all required keys for initializing hod.
-  def setupConf():
-    cfg = {
-            'hod' : {
-                      'original-dir' : os.getcwd(),
-                      'stream' : True,
-                      # store all the info about clusters in this directory
-                      'user_state' : '/tmp/hodtest',
-                      'debug' : 3,
-                      'java-home' : os.getenv('JAVA_HOME'),
-                      'cluster' : 'dummy',
-                      'cluster-factor' : 1.8,
-                      'xrs-port-range' : (32768,65536),
-                      'allocate-wait-time' : 3600,
-                      'temp-dir' : '/tmp/hod'
-                    },
-            # just set everything to dummy. Need something to initialize the
-            # node pool description object.
-            'resource_manager' : {
-                                   'id' : 'dummy',
-                                   'batch-home' : 'dummy',
-                                   'queue' : 'dummy',
-                                 }
-          }
-    cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
-    return cfg
-
-  setupConf = staticmethod(setupConf)
-
   def setUp(self):
 
-    self.cfg = test_InvalidArgsOperations.setupConf()
+    self.cfg = setupConf()
     # initialize the mock objects
     self.log = MockLogger()
     self.cluster = MockHadoopCluster()
@@ -202,6 +201,91 @@ class test_InvalidArgsOperations(unittest.TestCase):
       self.assertTrue(clusterDir in state.keys())
       self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
 
+class test_InvalidHodStateFiles(unittest.TestCase):
+  def setUp(self):
+    self.rootDir = '/tmp/hod-%s' % getpass.getuser()
+    self.cfg = setupConf() # creat a conf
+    # Modify hod.user_state
+    self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
+                              prefix='HodTestSuite.test_InvalidHodStateFiles_')
+    self.log = MockLogger() # mock logger
+    self.cluster = MockHadoopCluster() # mock hadoop cluster
+    self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
+    self.state = hodState(self.cfg['hod']['user_state'])
+    self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % \
+                                  TEST_CLUSTER_DATA_FILE)
+    self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
+                              prefix='HodTestSuite.test_InvalidHodStateFiles_')
+  
+  def testOperationWithInvalidStateFile(self):
+    jobid = '1234.hadoop.apache.org'
+    # create user state file with invalid permissions
+    stateFile = open(self.statePath, "w")
+    os.chmod(self.statePath, 000) # has no read/write permissions
+    self.client._hodRunner__cfg['hod']['operation'] = \
+                                             "info %s" % self.clusterDir
+    ret = self.client.operation()
+    os.chmod(self.statePath, 700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+                          os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+    
+  def testAllocateWithInvalidStateFile(self):
+    jobid = '1234.hadoop.apache.org'
+    # create user state file with invalid permissions
+    stateFile = open(self.statePath, "w")
+    os.chmod(self.statePath, 0400) # has no write permissions
+    self.client._hodRunner__cfg['hod']['operation'] = \
+                                        "allocate %s %s" % (self.clusterDir, '3')
+    ret = self.client.operation()
+    os.chmod(self.statePath, 700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % \
+                        os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+  
+  def testAllocateWithInvalidStateStore(self):
+    jobid = '1234.hadoop.apache.org'
+    self.client._hodRunner__cfg['hod']['operation'] = \
+                                      "allocate %s %s" % (self.clusterDir, 3)
+
+    ###### check with no executable permissions ######
+    stateFile = open(self.statePath, "w") # create user state file
+    os.chmod(self.cfg['hod']['user_state'], 0600) 
+    ret = self.client.operation()
+    os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+                          os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+    
+    ###### check with no write permissions ######
+    stateFile = open(self.statePath, "w") # create user state file
+    os.chmod(self.cfg['hod']['user_state'], 0500) 
+    ret = self.client.operation()
+    os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+                          os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+
+  def tearDown(self):
+    if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
+    if os.path.exists(self.cfg['hod']['user_state']):
+      os.rmdir(self.cfg['hod']['user_state'])
+
+
 class HodTestSuite(BaseTestSuite):
   def __init__(self):
     # suite setup