Преглед изворни кода

MAPREDUCE-2059. RecoveryManager excludes jobtracker.info from the list of jobs to be recovered. Contributed by Subroto Sanyal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.22@1204275 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko пре 13 година
родитељ
комит
33c0ce1bd6

+ 3 - 0
mapreduce/CHANGES.txt

@@ -641,6 +641,9 @@ Release 0.22.0 - Unreleased
     MAPREDUCE-3429. Few contrib tests are failing because of the missing
     commons-lang dependency (cos)
 
+    MAPREDUCE-2059. RecoveryManager excludes jobtracker.info from the list of
+    jobs to be recovered. (Subroto Sanyal via shv)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

+ 23 - 5
mapreduce/src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
@@ -1553,18 +1554,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           }
         } catch (FileNotFoundException fnf) {} //ignore
         // Make sure that the backup data is preserved
-        FileStatus[] systemDirData;
+        FileStatus[] jobDirData;
         try {
-          systemDirData = fs.listStatus(this.systemDir);
+          jobDirData = getJobFilesForRecovery(fs, systemDir);
         } catch (FileNotFoundException fnfe) {
-          systemDirData = null;
+          jobDirData = null;
         }
         
         // Check if the history is enabled .. as we can't have persistence with 
         // history disabled
         if (conf.getBoolean(JT_RESTART_ENABLED, false) 
-            && systemDirData != null) {
-          for (FileStatus status : systemDirData) {
+            && jobDirData != null) {
+          for (FileStatus status : jobDirData) {
             try {
               recoveryManager.addJobForRecovery(status);
             } catch (Throwable t) {
@@ -1642,6 +1643,23 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
+  /**
+   * Expects Recovery Manager to be initialized
+   */
+  FileStatus[] getJobFilesForRecovery(FileSystem fs, Path jobFolderPath)
+      throws IOException {
+    return fs.listStatus(jobFolderPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        if (path.getName().startsWith(
+            recoveryManager.getRestartCountFile().getName())) {
+          return false;
+        }
+        return true;
+      }
+    });
+  }
+
   /**
    * Recursively delete the contents of a directory without deleting the
    * directory itself.

+ 53 - 13
mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java

@@ -18,7 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
+
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
@@ -28,15 +29,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
-import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.MRConfig;
-
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesConfigFile;
-import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -85,8 +84,8 @@ public class TestRecoveryManager extends TestCase {
     RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
     LOG.info("Submitted job " + rJob1.getID());
     
-    while (rJob1.mapProgress() < 0.5f) {
-      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+    while (rJob1.mapProgress() < 0.2f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 20% done");
       UtilsForTests.waitFor(100);
     }
         
@@ -100,8 +99,8 @@ public class TestRecoveryManager extends TestCase {
     RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
     LOG.info("Submitted job " + rJob2.getID());
     
-    while (rJob2.mapProgress() < 0.5f) {
-      LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
+    while (rJob2.mapProgress() < 0.2f) {
+      LOG.info("Waiting for job " + rJob2.getID() + " to be 20% done");
       UtilsForTests.waitFor(100);
     }
     
@@ -146,7 +145,7 @@ public class TestRecoveryManager extends TestCase {
    * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown 
    * during recovery. It does the following :
    *  - submits a job with HIGH priority and x tasks
-   *  - allows it to complete 50%
+   *  - allows it to complete 20%
    *  - submits another job with normal priority and y tasks
    *  - kills the jobtracker
    *  - restarts the jobtracker with max-tasks-per-job such that 
@@ -181,8 +180,8 @@ public class TestRecoveryManager extends TestCase {
     RunningJob rJob1 = jc.submitJob(job1);
     LOG.info("Submitted first job " + rJob1.getID());
     
-    while (rJob1.mapProgress() < 0.5f) {
-      LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
+    while (rJob1.mapProgress() < 0.2f) {
+      LOG.info("Waiting for job " + rJob1.getID() + " to be 20% done");
       UtilsForTests.waitFor(100);
     }
     
@@ -313,7 +312,7 @@ public class TestRecoveryManager extends TestCase {
     assertFalse("Info file exists after update failure", 
                 fs.exists(restartFile));
     assertFalse("Temporary restart-file exists after update failure", 
-                fs.exists(restartFile));
+                fs.exists(tmpRestartFile));
 
     // start 1 data node
     dfs.startDataNodes(conf, 1, true, null, null, null, null);
@@ -326,5 +325,46 @@ public class TestRecoveryManager extends TestCase {
       failed = true;
     }
     assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
+    dfs.shutdown();
+  }
+  
+  public void testRestartCountFileWillNotBeListedForRecovery() throws Exception {
+    MiniDFSCluster dfsCluster = null;
+    MiniMRCluster mrCluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(JTConfig.JT_RESTART_ENABLED, true);
+      System
+          .setProperty("hadoop.log.dir", System.getProperty("java.io.tmpdir"));
+      dfsCluster = 
+        new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
+      mrCluster = 
+        new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 1);
+      JobTracker jobTracker = mrCluster.getJobTrackerRunner().getJobTracker();
+      Path jobFolderPath = new Path(jobTracker.getSystemDir());
+      FileSystem fileSystem = jobTracker.getFileSystem();
+      Path restartCountFile = jobTracker.recoveryManager.getRestartCountFile();
+      FSDataOutputStream restartFileStream = fileSystem
+          .create(restartCountFile);
+      restartFileStream.writeInt(0);
+      restartFileStream.close();
+      Path tempRestartFile = jobTracker.recoveryManager
+          .getTempRestartCountFile();
+      FSDataOutputStream tempRestartFileStream = fileSystem
+          .create(tempRestartFile);
+      tempRestartFileStream.writeInt(0);
+      tempRestartFileStream.close();
+      assertTrue("Restart Count File doesn't exist", fileSystem
+          .exists(restartCountFile));
+      assertTrue("Restart Count File doesn't exist", fileSystem
+          .exists(tempRestartFile));
+      FileStatus[] jobFilesForRecovery = jobTracker.getJobFilesForRecovery(
+          fileSystem, jobFolderPath);
+      assertTrue("Restart Count File and Temp Restart count file exist.",
+          0 == jobFilesForRecovery.length);
+    } finally {
+      if(mrCluster != null) mrCluster.shutdown();
+      if(dfsCluster != null) dfsCluster.shutdown();
+    }
   }
 }