Explorar o código

Merge -r 753111:753113 from trunk onto 0.20 branch. Fixes HADOOP-5449.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@753114 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das %!s(int64=16) %!d(string=hai) anos
pai
achega
1b9e39477e

+ 3 - 0
CHANGES.txt

@@ -755,6 +755,9 @@ Release 0.19.2 - Unreleased
  
     HADOOP-5446. Restore TaskTracker metrics. (cdouglas)
 
+    HADOOP-5449. Fixes the history cleaner thread. 
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 22 - 12
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -98,6 +98,7 @@ public class JobHistory {
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;
   private static String jobtrackerHostname;
+  private static JobConf jtConf;
   /**
    * Record types are identifiers for each line of log in history files. 
    * A record type appears as the first token in a single line of log. 
@@ -163,6 +164,7 @@ public class JobHistory {
       jobHistoryBlockSize = 
         conf.getLong("mapred.jobtracker.job.history.block.size", 
                      3 * 1024 * 1024);
+      jtConf = conf;
     } catch(IOException e) {
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true;
@@ -1678,7 +1680,7 @@ public class JobHistory {
     static final long THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS;
     private long now; 
     private static boolean isRunning = false; 
-    private static long lastRan; 
+    private static long lastRan = 0; 
 
     /**
      * Cleans up history data. 
@@ -1689,26 +1691,34 @@ public class JobHistory {
       }
       now = System.currentTimeMillis();
       // clean history only once a day at max
-      if (lastRan ==0 || (now - lastRan) < ONE_DAY_IN_MS){
+      if (lastRan != 0 && (now - lastRan) < ONE_DAY_IN_MS) {
         return; 
       }
       lastRan = now;  
       isRunning = true; 
-      File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){
-          public boolean accept(File file){
-            // delete if older than 30 days
-            if (now - file.lastModified() > THIRTY_DAYS_IN_MS){
-              return true; 
+      try {
+        Path logDir = new Path(LOG_DIR);
+        FileSystem fs = logDir.getFileSystem(jtConf);
+        FileStatus[] historyFiles = fs.listStatus(logDir);
+        // delete if older than 30 days
+        if (historyFiles != null) {
+          for (FileStatus f : historyFiles) {
+            if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
+              fs.delete(f.getPath(), true); 
+              LOG.info("Deleting old history file : " + f.getPath());
             }
-            return false; 
           }
-        });
-      for(File f : oldFiles){
-        f.delete(); 
-        LOG.info("Deleting old history file : " + f.getName());
+        }
+      } catch (IOException ie) {
+        LOG.info("Error cleaning up history directory" + 
+                 StringUtils.stringifyException(ie));
       }
       isRunning = false; 
     }
+    
+    static long getLastRan() {
+      return lastRan;
+    }
   }
 
   /**

+ 4 - 0
src/test/org/apache/hadoop/mapred/TestJobHistory.java

@@ -970,16 +970,20 @@ public class TestJobHistory extends TestCase {
       // existing in history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
+      long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
+      assertTrue(historyCleanerRanAt != 0);
       
       // Run a job that will be failed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobFail(conf, inDir, outDir);
       validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
+      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
       
       // Run a job that will be killed and validate its job status
       // existing in history file
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
       validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
+      assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
       
     } finally {
       if (mr != null) {