Преглед изворни кода

Merge -r 667032:667033 from trunk onto 0.18 branch. Fixes HADOOP-3523.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.18@667035 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das пре 17 година
родитељ
комит
f4e775b2ee

+ 6 - 4
src/contrib/hod/CHANGES.txt

@@ -1,7 +1,6 @@
 HOD Change Log
 HOD Change Log
 
 
-
-Trunk (unreleased changes)
+Release 0.18.0 - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
 
 
@@ -29,10 +28,13 @@ Trunk (unreleased changes)
  
  
   BUG FIXES
   BUG FIXES
 
 
-    HADOOP-2961: Avoids unnecessary checks for some configuration parameters
+    HADOOP-2961. Avoids unnecessary checks for some configuration parameters
     related to service configuration. (Vinod Kumar Vavilapalli via ddas)
     related to service configuration. (Vinod Kumar Vavilapalli via ddas)
 
 
-Release 0.17.0 - Unreleased
+    HADOOP-3523. Fixes auto-deallocation of cluster if job id is not found in
+    Torque's job list (Hemanth Yamijala via ddas)
+
+Release 0.17.0 - 2008-05-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
 
 

+ 3 - 1
src/contrib/hod/hodlib/Hod/hadoop.py

@@ -431,7 +431,9 @@ class hadoopCluster:
     """Returns True if the JobId that represents this cluster
     """Returns True if the JobId that represents this cluster
        is in the Completed or exiting state."""
        is in the Completed or exiting state."""
     jobInfo = self.__nodePool.getJobInfo(jobId)
     jobInfo = self.__nodePool.getJobInfo(jobId)
-    state = jobInfo['job_state']
+    state = None
+    if jobInfo is not None and jobInfo.has_key('job_state'):
+      state = jobInfo['job_state']
     return ((state == 'C') or (state == 'E'))
     return ((state == 'C') or (state == 'E'))
 
 
   def cleanup(self):
   def cleanup(self):

+ 1 - 1
src/contrib/hod/hodlib/Hod/hod.py

@@ -307,7 +307,7 @@ class hodRunner:
           self.__remove_cluster(clusterDir)
           self.__remove_cluster(clusterDir)
           self.__clusterState.clear()
           self.__clusterState.clear()
         else:
         else:
-          self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
+          self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. HOD cannot determine if this cluster can be automatically deallocated. Deallocate the cluster if it is unused." % (clusterDir))
           self.__opCode = 12
           self.__opCode = 12
           return
           return
  
  

+ 8 - 1
src/contrib/hod/hodlib/NodePools/torque.py

@@ -270,7 +270,8 @@ class TorquePool(NodePool):
 
 
   def getJobInfo(self, jobId=None):
   def getJobInfo(self, jobId=None):
     #torque error code when credentials fail, a temporary condition sometimes.
     #torque error code when credentials fail, a temporary condition sometimes.
-    credFailureErrorCode = 171 
+    credFailureErrorCode = 171
+    jobNonExistentErrorCode = 153
     credFailureRetries = 10
     credFailureRetries = 10
     i = 0
     i = 0
     self.__jobInfo = None
     self.__jobInfo = None
@@ -283,6 +284,12 @@ class TorquePool(NodePool):
       if exitCode == 0:
       if exitCode == 0:
         self.__jobInfo = qstatInfo
         self.__jobInfo = qstatInfo
         break
         break
+      elif exitCode == jobNonExistentErrorCode:
+        # This really means that the job completed
+        # However, setting only job_state for now, not 
+        # any other attributes, as none seem required.
+        self.__jobInfo = { 'job_state' : 'C' }
+        break
       else:
       else:
         if exitCode == credFailureErrorCode:
         if exitCode == credFailureErrorCode:
           time.sleep(1)
           time.sleep(1)

+ 1 - 1
src/contrib/hod/hodlib/Schedulers/torque.py

@@ -93,7 +93,7 @@ class torqueInterface:
     
     
     exitCode = qstatProcess.exit_code()
     exitCode = qstatProcess.exit_code()
     if exitCode > 0:
     if exitCode > 0:
-      self.__log.error('qstat error: %s' % qstatProcess.exit_status_string())
+      self.__log.warn('qstat error: %s' % qstatProcess.exit_status_string())
     else:
     else:
       qstatInfo = {}
       qstatInfo = {}
       for line in qstatProcess.output():
       for line in qstatProcess.output():

+ 4 - 1
src/contrib/hod/testing/testHod.py

@@ -185,7 +185,10 @@ class test_InvalidArgsOperations(unittest.TestCase):
     userState = { clusterDir : jobid }
     userState = { clusterDir : jobid }
     self.__setupClusterState(userState, False)
     self.__setupClusterState(userState, False)
     self.client._op_allocate(['allocate', clusterDir, '3'])
     self.client._op_allocate(['allocate', clusterDir, '3'])
-    self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir), 'critical'))
+    self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "\
+                      "cluster directory '%s'. HOD cannot determine if this cluster "\
+                      "can be automatically deallocated. Deallocate the cluster if it "\
+                      "is unused." % (clusterDir), 'critical'))
     os.rmdir(clusterDir)
     os.rmdir(clusterDir)
 
 
   def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
   def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):

+ 2 - 1
src/contrib/hod/testing/testRingmasterRPCs.py

@@ -68,7 +68,8 @@ class test_GetCommand(unittest.TestCase):
                             'batch-home': '/home/y/'
                             'batch-home': '/home/y/'
                           }, 
                           }, 
        'ringmaster': {
        'ringmaster': {
-                      'max-connect' : 2
+                      'max-connect' : 2,
+                      'max-master-failures' : 5
                      }, 
                      }, 
        'hodring': {
        'hodring': {
                   }, 
                   },