Browse Source

HADOOP-3153. Fixes the way HOD handles allocation if the user has no permissions to update the clusters state file. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@647090 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 17 years ago
parent
commit
01c5a3e705
3 changed files with 221 additions and 50 deletions
  1. 14 0
      src/contrib/hod/CHANGES.txt
  2. 91 18
      src/contrib/hod/hodlib/Hod/hod.py
  3. 116 32
      src/contrib/hod/testing/testHod.py

+ 14 - 0
src/contrib/hod/CHANGES.txt

@@ -3,6 +3,16 @@ HOD Change Log
 
 
 Trunk (unreleased changes)
 Trunk (unreleased changes)
 
 
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+ 
+  BUG FIXES
+
+Release 0.17.0 - Unreleased
+
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
 
 
     HADOOP-3137. Modified build script to pick up version automatically
     HADOOP-3137. Modified build script to pick up version automatically
@@ -50,6 +60,10 @@ Trunk (unreleased changes)
     directory, script file and other options.
     directory, script file and other options.
     (Vinod Kumar Vavilapalli via yhemanth)
     (Vinod Kumar Vavilapalli via yhemanth)
 
 
+    HADOOP-3153. Fixes the way HOD handles allocation if the user has no
+    permissions to update the clusters state file.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
 Release 0.16.2 - 2008-04-02
 Release 0.16.2 - 2008-04-02
 
 
   BUG FIXES
   BUG FIXES

+ 91 - 18
src/contrib/hod/hodlib/Hod/hod.py

@@ -15,7 +15,7 @@
 #limitations under the License.
 #limitations under the License.
 # -*- python -*-
 # -*- python -*-
 
 
-import sys, os, getpass, pprint, re, cPickle, random, shutil, time
+import sys, os, getpass, pprint, re, cPickle, random, shutil, time, errno
 
 
 import hodlib.Common.logger
 import hodlib.Common.logger
 
 
@@ -30,6 +30,23 @@ from hodlib.Common.nodepoolutil import NodePoolUtil
 from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
 from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
 
 
 CLUSTER_DATA_FILE = 'clusters'
 CLUSTER_DATA_FILE = 'clusters'
+INVALID_STATE_FILE_MSGS = \
+              [
+
+                "Requested operation cannot be performed. Cannot read %s: " + \
+                "Permission denied.",
+
+                "Requested operation cannot be performed. " + \
+                "Cannot write to %s: Permission denied.",
+
+                "Requested operation cannot be performed. " + \
+                "Cannot read/write to %s: Permission denied.",
+
+                "Cannot update %s: Permission denied. " + \
+                "Cluster is deallocated, but info and list " + \
+                "operations might show incorrect information.",
+
+              ]
 
 
 class hodState:
 class hodState:
   def __init__(self, store):
   def __init__(self, store):
@@ -50,7 +67,28 @@ class hodState:
       for item in os.listdir(self.__store):
       for item in os.listdir(self.__store):
         if item.endswith(self.__STORE_EXT):  
         if item.endswith(self.__STORE_EXT):  
           self.__stateFile = os.path.join(self.__store, item)          
           self.__stateFile = os.path.join(self.__store, item)          
+
+  def get_state_file(self):
+    return self.__stateFile
           
           
+  def checkStateFile(self, id=None, modes=(os.R_OK,)):
+    # is state file exists/readable/writable/both?
+    self.__set_state_file(id)
+
+    # return true if file doesn't exist, because HOD CAN create
+    # state file and so WILL have permissions to read and/or write
+    try:
+      os.stat(self.__stateFile)
+    except OSError, err:
+      if err.errno == errno.ENOENT: # error 2 (no such file)
+        return True
+
+    # file exists
+    ret = True
+    for mode in modes:
+      ret = ret and os.access(self.__stateFile, mode)
+    return ret
+
   def read(self, id=None):
   def read(self, id=None):
     info = {}
     info = {}
     
     
@@ -72,7 +110,7 @@ class hodState:
     self.__set_state_file(id)
     self.__set_state_file(id)
     if not os.path.exists(self.__stateFile):
     if not os.path.exists(self.__stateFile):
       self.clear(id)
       self.clear(id)
-      
+ 
     stateFile = open(self.__stateFile, 'w')
     stateFile = open(self.__stateFile, 'w')
     cPickle.dump(info, stateFile)
     cPickle.dump(info, stateFile)
     stateFile.close()
     stateFile.close()
@@ -215,6 +253,13 @@ class hodRunner:
         self.__opCode = 3
         self.__opCode = 3
         return
         return
 
 
+      if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, \
+                                              (os.R_OK, os.W_OK)):
+        self.__log.critical(INVALID_STATE_FILE_MSGS[2] % \
+                         self.__userState.get_state_file())
+        self.__opCode = 1
+        return
+
       clusterList = self.__userState.read(CLUSTER_DATA_FILE)
       clusterList = self.__userState.read(CLUSTER_DATA_FILE)
       if clusterDir in clusterList.keys():
       if clusterDir in clusterList.keys():
         self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
         self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
@@ -255,21 +300,28 @@ class hodRunner:
             # Allocation has gone through.
             # Allocation has gone through.
             # Don't care about interrupts any more
             # Don't care about interrupts any more
 
 
-            if allocateStatus == 0:
-              self.__set_cluster_state_info(os.environ, 
-                                            self.__cluster.hdfsInfo, 
-                                            self.__cluster.mapredInfo, 
-                                            self.__cluster.ringmasterXRS,
-                                            self.__cluster.jobId,
-                                            min, max)
-              self.__setup_cluster_state(clusterDir)
-              self.__clusterState.write(self.__cluster.jobId, 
-                                        self.__clusterStateInfo)
-              #  Do we need to check for interrupts here ??
-
-              self.__set_user_state_info( 
-                { clusterDir : self.__cluster.jobId, } )
-            self.__opCode = allocateStatus
+            try:
+              if allocateStatus == 0:
+                self.__set_cluster_state_info(os.environ, 
+                                              self.__cluster.hdfsInfo, 
+                                              self.__cluster.mapredInfo, 
+                                              self.__cluster.ringmasterXRS,
+                                              self.__cluster.jobId,
+                                              min, max)
+                self.__setup_cluster_state(clusterDir)
+                self.__clusterState.write(self.__cluster.jobId, 
+                                          self.__clusterStateInfo)
+                #  Do we need to check for interrupts here ??
+  
+                self.__set_user_state_info( 
+                  { clusterDir : self.__cluster.jobId, } )
+              self.__opCode = allocateStatus
+            except Exception, e:
+              # Some unknown problem.
+              self.__cleanup()
+              self.__cluster.deallocate(clusterDir, self.__clusterStateInfo)
+              self.__opCode = 1
+              raise Exception(e)
           elif self.__opCode == 12:
           elif self.__opCode == 12:
             self.__log.critical("Cluster %s already allocated." % clusterDir)
             self.__log.critical("Cluster %s already allocated." % clusterDir)
           elif self.__opCode == 10:
           elif self.__opCode == 10:
@@ -314,6 +366,11 @@ class hodRunner:
           # irrespective of whether deallocate failed or not\
           # irrespective of whether deallocate failed or not\
           # remove the cluster state.
           # remove the cluster state.
           self.__clusterState.clear()
           self.__clusterState.clear()
+          if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
+            self.__log.critical(INVALID_STATE_FILE_MSGS[3] % \
+                               self.__userState.get_state_file())
+            self.__opCode = 1
+            return
           self.__remove_cluster(clusterDir)
           self.__remove_cluster(clusterDir)
       else:
       else:
         self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
         self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
@@ -385,14 +442,25 @@ class hodRunner:
       self.__opCode = 3      
       self.__opCode = 3      
 
 
   def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
   def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
+    if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
+      self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \
+                           self.__userState.get_state_file())
+      self.__opCode = 1
+      return
+
     clusterList = self.__userState.read(CLUSTER_DATA_FILE)
     clusterList = self.__userState.read(CLUSTER_DATA_FILE)
     if clusterDir in clusterList.keys():
     if clusterDir in clusterList.keys():
       # previously allocated cluster.
       # previously allocated cluster.
       self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
       self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
       if cleanUp:
       if cleanUp:
         self.__cluster.delete_job(clusterList[clusterDir])
         self.__cluster.delete_job(clusterList[clusterDir])
-        self.__remove_cluster(clusterDir)
         self.__log.critical("Freeing resources allocated to the cluster.")
         self.__log.critical("Freeing resources allocated to the cluster.")
+        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
+          self.__log.critical(INVALID_STATE_FILE_MSGS[1] % \
+                              self.__userState.get_state_file())
+          self.__opCode = 1
+          return
+        self.__remove_cluster(clusterDir)
       self.__opCode = 3
       self.__opCode = 3
     else:
     else:
       self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir))
       self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir))
@@ -434,6 +502,11 @@ class hodRunner:
     try:
     try:
       opList = self.__check_operation(operation)
       opList = self.__check_operation(operation)
       if self.__opCode == 0:
       if self.__opCode == 0:
+        if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
+           self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \
+                         self.__userState.get_state_file())
+           self.__opCode = 1
+           return self.__opCode
         getattr(self, "_op_%s" % opList[0])(opList)
         getattr(self, "_op_%s" % opList[0])(opList)
     except HodInterruptException, h:
     except HodInterruptException, h:
       self.__log.critical("op: %s failed because of a process interrupt." \
       self.__log.critical("op: %s failed because of a process interrupt." \

+ 116 - 32
src/contrib/hod/testing/testHod.py

@@ -20,6 +20,7 @@ rootDirectory   = re.sub("/testing/.*", "", myDirectory)
 
 
 sys.path.append(rootDirectory)
 sys.path.append(rootDirectory)
 
 
+import tempfile
 from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
 from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
 from hodlib.Hod.hod import hodRunner, hodState
 from hodlib.Hod.hod import hodRunner, hodState
 from hodlib.Common.desc import NodePoolDesc
 from hodlib.Common.desc import NodePoolDesc
@@ -27,46 +28,44 @@ from hodlib.Common.desc import NodePoolDesc
 excludes = []
 excludes = []
 
 
 # Information about all clusters is written to a file called clusters.state.
 # Information about all clusters is written to a file called clusters.state.
-TEST_CLUSTER_DATA_FILE='clusters'
+from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, \
+                           INVALID_STATE_FILE_MSGS
 
 
 # Temp directory prefix
 # Temp directory prefix
 TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
 TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
 
 
+# build a config object with all required keys for initializing hod.
+def setupConf():
+  cfg = {
+          'hod' : {
+                    'original-dir' : os.getcwd(),
+                    'stream' : True,
+                    # store all the info about clusters in this directory
+                    'user_state' : '/tmp/hodtest',
+                    'debug' : 3,
+                    'java-home' : os.getenv('JAVA_HOME'),
+                    'cluster' : 'dummy',
+                    'cluster-factor' : 1.8,
+                    'xrs-port-range' : (32768,65536),
+                    'allocate-wait-time' : 3600,
+                    'temp-dir' : '/tmp/hod'
+                  },
+          # just set everything to dummy. Need something to initialize the
+          # node pool description object.
+          'resource_manager' : {
+                                 'id' : 'dummy',
+                                 'batch-home' : 'dummy',
+                                 'queue' : 'dummy',
+                               }
+        }
+  cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
+  return cfg
+
 # Test class that defines methods to test invalid arguments to hod operations.
 # Test class that defines methods to test invalid arguments to hod operations.
 class test_InvalidArgsOperations(unittest.TestCase):
 class test_InvalidArgsOperations(unittest.TestCase):
-
-  # build a config object with all required keys for initializing hod.
-  def setupConf():
-    cfg = {
-            'hod' : {
-                      'original-dir' : os.getcwd(),
-                      'stream' : True,
-                      # store all the info about clusters in this directory
-                      'user_state' : '/tmp/hodtest',
-                      'debug' : 3,
-                      'java-home' : os.getenv('JAVA_HOME'),
-                      'cluster' : 'dummy',
-                      'cluster-factor' : 1.8,
-                      'xrs-port-range' : (32768,65536),
-                      'allocate-wait-time' : 3600,
-                      'temp-dir' : '/tmp/hod'
-                    },
-            # just set everything to dummy. Need something to initialize the
-            # node pool description object.
-            'resource_manager' : {
-                                   'id' : 'dummy',
-                                   'batch-home' : 'dummy',
-                                   'queue' : 'dummy',
-                                 }
-          }
-    cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
-    return cfg
-
-  setupConf = staticmethod(setupConf)
-
   def setUp(self):
   def setUp(self):
 
 
-    self.cfg = test_InvalidArgsOperations.setupConf()
+    self.cfg = setupConf()
     # initialize the mock objects
     # initialize the mock objects
     self.log = MockLogger()
     self.log = MockLogger()
     self.cluster = MockHadoopCluster()
     self.cluster = MockHadoopCluster()
@@ -202,6 +201,91 @@ class test_InvalidArgsOperations(unittest.TestCase):
       self.assertTrue(clusterDir in state.keys())
       self.assertTrue(clusterDir in state.keys())
       self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
       self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
 
 
+class test_InvalidHodStateFiles(unittest.TestCase):
+  def setUp(self):
+    self.rootDir = '/tmp/hod-%s' % getpass.getuser()
+    self.cfg = setupConf() # creat a conf
+    # Modify hod.user_state
+    self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
+                              prefix='HodTestSuite.test_InvalidHodStateFiles_')
+    self.log = MockLogger() # mock logger
+    self.cluster = MockHadoopCluster() # mock hadoop cluster
+    self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
+    self.state = hodState(self.cfg['hod']['user_state'])
+    self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % \
+                                  TEST_CLUSTER_DATA_FILE)
+    self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
+                              prefix='HodTestSuite.test_InvalidHodStateFiles_')
+  
+  def testOperationWithInvalidStateFile(self):
+    jobid = '1234.hadoop.apache.org'
+    # create user state file with invalid permissions
+    stateFile = open(self.statePath, "w")
+    os.chmod(self.statePath, 000) # has no read/write permissions
+    self.client._hodRunner__cfg['hod']['operation'] = \
+                                             "info %s" % self.clusterDir
+    ret = self.client.operation()
+    os.chmod(self.statePath, 700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+                          os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+    
+  def testAllocateWithInvalidStateFile(self):
+    jobid = '1234.hadoop.apache.org'
+    # create user state file with invalid permissions
+    stateFile = open(self.statePath, "w")
+    os.chmod(self.statePath, 0400) # has no write permissions
+    self.client._hodRunner__cfg['hod']['operation'] = \
+                                        "allocate %s %s" % (self.clusterDir, '3')
+    ret = self.client.operation()
+    os.chmod(self.statePath, 700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % \
+                        os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+  
+  def testAllocateWithInvalidStateStore(self):
+    jobid = '1234.hadoop.apache.org'
+    self.client._hodRunner__cfg['hod']['operation'] = \
+                                      "allocate %s %s" % (self.clusterDir, 3)
+
+    ###### check with no executable permissions ######
+    stateFile = open(self.statePath, "w") # create user state file
+    os.chmod(self.cfg['hod']['user_state'], 0600) 
+    ret = self.client.operation()
+    os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+                          os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+    
+    ###### check with no write permissions ######
+    stateFile = open(self.statePath, "w") # create user state file
+    os.chmod(self.cfg['hod']['user_state'], 0500) 
+    ret = self.client.operation()
+    os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
+    stateFile.close()
+    os.remove(self.statePath)
+    # print self.log._MockLogger__logLines
+    self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+                          os.path.realpath(self.statePath), 'critical'))
+    self.assertEquals(ret, 1)
+
+  def tearDown(self):
+    if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
+    if os.path.exists(self.cfg['hod']['user_state']):
+      os.rmdir(self.cfg['hod']['user_state'])
+
+
 class HodTestSuite(BaseTestSuite):
 class HodTestSuite(BaseTestSuite):
   def __init__(self):
   def __init__(self):
     # suite setup
     # suite setup