Bläddra i källkod

HADOOP-5921. Fixes a problem in the JobTracker where it sometimes never used to come up due to a system file creation on JobTracker's system-dir failing.This problem would sometimes show up only when the FS for the system-dir (usually HDFS) is started at nearly the same time as the JobTracker. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@784661 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 år sedan
förälder
incheckning
469918dc6f

+ 6 - 0
CHANGES.txt

@@ -939,6 +939,12 @@ Release 0.20.1 - Unreleased
     causing TestQueueCapacities to fail.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-5921. Fixes a problem in the JobTracker where it sometimes never used
+    to come up due to a system file creation on JobTracker's system-dir failing. 
+    This problem would sometimes show up only when the FS for the system-dir 
+    (usually HDFS) is started at nearly the same time as the JobTracker. 
+    (Amar Kamat via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 44 - 26
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -121,7 +121,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   private int NUM_HEARTBEATS_IN_SECOND = 100;
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
-  private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
+  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
@@ -1194,17 +1194,38 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         shouldRecover = false;
 
         // write the jobtracker.info file
-        FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
-        out.writeInt(0);
-        out.close();
+        try {
+          FSDataOutputStream out = FileSystem.create(fs, restartFile, 
+                                                     filePerm);
+          out.writeInt(0);
+          out.close();
+        } catch (IOException ioe) {
+          LOG.warn("Writing to file " + restartFile + " failed!");
+          LOG.warn("FileSystem is not ready yet!");
+          fs.delete(restartFile, false);
+          throw ioe;
+        }
         return;
       }
 
       FSDataInputStream in = fs.open(restartFile);
-      // read the old count
-      restartCount = in.readInt();
-      ++restartCount; // increment the restart count
-      in.close();
+      try {
+        // read the old count
+        restartCount = in.readInt();
+        ++restartCount; // increment the restart count
+      } catch (IOException ioe) {
+        LOG.warn("System directory is garbled. Failed to read file " 
+                 + restartFile);
+        LOG.warn("Jobtracker recovery is not possible with garbled"
+                 + " system directory! Please delete the system directory and"
+                 + " restart the jobtracker. Note that deleting the system" 
+                 + " directory will result in loss of all the running jobs.");
+        throw new RuntimeException(ioe);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+      }
 
       // Write back the new restart count and rename the old info file
       //TODO This is similar to jobhistory recovery, maybe this common code
@@ -1725,24 +1746,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         }
         LOG.info("problem cleaning system directory: " + systemDir, ie);
       }
-      Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
-    }
-
-    // Prepare for recovery. This is done irrespective of the status of restart
-    // flag.
-    try {
-      recoveryManager.updateRestartCount();
-    } catch (IOException ioe) {
-      LOG.warn("Failed to initialize recovery manager. The Recovery manager "
-               + "failed to access the system files in the system dir (" 
-               + getSystemDir() + ")."); 
-      LOG.warn("It might be because the JobTracker failed to read/write system"
-               + " files (" + recoveryManager.getRestartCountFile() + " / " 
-               + recoveryManager.getTempRestartCountFile() + ") or the system "
-               + " file " + recoveryManager.getRestartCountFile() 
-               + " is missing!");
-      LOG.warn("Bailing out...");
-      throw ioe;
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
     }
     
     // Same with 'localDir' except it's always on the local disk.
@@ -1860,6 +1864,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
+    // Prepare for recovery. This is done irrespective of the status of restart
+    // flag.
+    while (true) {
+      try {
+        recoveryManager.updateRestartCount();
+        break;
+      } catch (IOException ioe) {
+        LOG.warn("Failed to initialize recovery manager. ", ioe);
+        // wait for some time
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+        LOG.warn("Retrying...");
+      }
+    }
+
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler

+ 53 - 2
src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
 import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
@@ -310,7 +311,7 @@ public class TestRecoveryManager extends TestCase {
     fs.delete(rFile,false);
     
     // start the jobtracker
-    LOG.info("Stopping jobtracker with system files deleted");
+    LOG.info("Starting jobtracker with system files deleted");
     mr.startJobTracker();
     
     UtilsForTests.waitForJobTracker(jc);
@@ -394,8 +395,58 @@ public class TestRecoveryManager extends TestCase {
     LOG.info("Starting jobtracker with fs errors");
     mr.startJobTracker();
     JobTrackerRunner runner = mr.getJobTrackerRunner();
-    assertFalse("Restart count for new job is incorrect", runner.isActive());
+    assertFalse("JobTracker is still alive", runner.isActive());
 
     mr.shutdown();
   } 
+
+  /**
+   * Test if the jobtracker waits for the info file to be created before 
+   * starting.
+   */
+  public void testJobTrackerInfoCreation() throws Exception {
+    LOG.info("Testing jobtracker.info file");
+    MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+                      + (dfs.getFileSystem()).getUri().getPort();
+    // shut down the data nodes
+    dfs.shutdownDataNodes();
+
+    // start the jobtracker
+    JobConf conf = new JobConf();
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+
+    JobTracker jobtracker = new JobTracker(conf);
+
+    // now check if the update restart count works fine or not
+    boolean failed = false;
+    try {
+      jobtracker.recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      failed = true;
+    }
+    assertTrue("JobTracker created info files without datanodes!!!", failed);
+
+    Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
+    Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
+    FileSystem fs = dfs.getFileSystem();
+    assertFalse("Info file exists after update failure", 
+                fs.exists(restartFile));
+    assertFalse("Temporary restart-file exists after update failure", 
+                fs.exists(restartFile));
+
+    // start 1 data node
+    dfs.startDataNodes(conf, 1, true, null, null, null, null);
+    dfs.waitActive();
+
+    failed = false;
+    try {
+      jobtracker.recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      failed = true;
+    }
+    assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
+  }
 }