Browse Source

Merge -r 761631:761632 from trunk onto 0.20 branch. Fixes HADOOP-5337.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@761637 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
946e38fce5

+ 4 - 0
CHANGES.txt

@@ -836,6 +836,10 @@ Release 0.20.0 - Unreleased
 
     HADOOP-5605. All the replicas incorrectly got marked as corrupt. (hairong)
 
+    HADOOP-5337. JobTracker, upon restart, now waits for the TaskTrackers to
+    join back before scheduling new tasks. This fixes race conditions associated
+    with greedy scheduling as was the case earlier. (Amar Kamat via ddas) 
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

+ 22 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -672,6 +672,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     Set<JobID> jobsToRecover; // set of jobs to be recovered
     
     private int totalEventsRecovered = 0;
+
+    Set<String> recoveredTrackers = 
+      Collections.synchronizedSet(new HashSet<String>());
     
     /** A custom listener that replays the events in the order in which the 
      * events (task attempts) occurred. 
@@ -848,6 +851,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return jobsToRecover.size() != 0;
     }
 
+    public boolean shouldSchedule() {
+      return recoveredTrackers.isEmpty();
+    }
+
+    private void markTracker(String trackerName) {
+      recoveredTrackers.add(trackerName);
+    }
+
+    void unMarkTracker(String trackerName) {
+      recoveredTrackers.remove(trackerName);
+    }
+
     Set<JobID> getJobsToRecover() {
       return jobsToRecover;
     }
@@ -984,6 +999,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       // IV. Register a new tracker
       boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
       if (!isTrackerRegistered) {
+        markTracker(trackerName); // add the tracker to recovery-manager
         addNewTracker(ttStatus);
       }
       
@@ -2321,6 +2337,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // started JobTracker
         if (hasRestarted()) {
           addRestartInfo = true;
+          // inform the recovery manager about this tracker joining back
+          recoveryManager.unMarkTracker(trackerName);
         } else {
           // Jobtracker might have restarted but no recovery is needed
           // otherwise this code should not be reached
@@ -2362,7 +2380,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
       
     // Check for new tasks to be executed on the tasktracker
-    if (acceptNewTasks && !isBlacklisted) {
+    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -3306,6 +3324,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       trackerToJobsToCleanup.remove(trackerName);
     }
     
+    // Inform the recovery manager
+    recoveryManager.unMarkTracker(trackerName);
+    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 

+ 34 - 19
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -438,7 +438,7 @@ public class MiniMRCluster {
     this.jobTrackerPort = jobTrackerPort;
     this.taskTrackerPort = taskTrackerPort;
     this.jobTrackerInfoPort = 0;
-    this.numTaskTrackers = numTaskTrackers;
+    this.numTaskTrackers = 0;
     this.namenode = namenode;
     this.ugi = ugi;
     this.conf = conf; // this is the conf the mr starts with
@@ -448,27 +448,18 @@ public class MiniMRCluster {
 
     // Create the TaskTrackers
     for (int idx = 0; idx < numTaskTrackers; idx++) {
+      String rack = null;
+      String host = null;
       if (racks != null) {
-        StaticMapping.addNodeToRack(hosts[idx],racks[idx]);
+        rack = racks[idx];
       }
       if (hosts != null) {
-        NetUtils.addStaticResolution(hosts[idx], "localhost");
+        host = hosts[idx];
       }
-      TaskTrackerRunner taskTracker;
-      taskTracker = new TaskTrackerRunner(idx, numDir, 
-          hosts == null ? null : hosts[idx], conf);
       
-      Thread taskTrackerThread = new Thread(taskTracker);
-      taskTrackerList.add(taskTracker);
-      taskTrackerThreadList.add(taskTrackerThread);
+      startTaskTracker(host, rack, idx, numDir);
     }
 
-    // Start the MiniMRCluster
-        
-    for (Thread taskTrackerThread : taskTrackerThreadList){
-      taskTrackerThread.start();
-    }
-    
     this.job = createJobConf(conf);
     waitUntilIdle();
   }
@@ -598,19 +589,43 @@ public class MiniMRCluster {
    * Kill the tasktracker.
    */
   public void stopTaskTracker(int id) {
-    taskTrackerList.get(id).shutdown();
+    TaskTrackerRunner tracker = taskTrackerList.remove(id);
+    tracker.shutdown();
 
-    taskTrackerThreadList.get(id).interrupt();
+    Thread thread = taskTrackerThreadList.remove(id);
+    thread.interrupt();
     
     try {
-      taskTrackerThreadList.get(id).join();
+      thread.join();
       // This will break the wait until idle loop
-      taskTrackerList.get(id).isDead = true;
+      tracker.isDead = true;
+      --numTaskTrackers;
     } catch (InterruptedException ex) {
       LOG.error("Problem waiting for task tracker to finish", ex);
     }
   }
   
+  /**
+   * Start the tasktracker.
+   */
+  public void startTaskTracker(String host, String rack, int idx, int numDir) 
+  throws IOException {
+    if (rack != null) {
+      StaticMapping.addNodeToRack(host, rack);
+    }
+    if (host != null) {
+      NetUtils.addStaticResolution(host, "localhost");
+    }
+    TaskTrackerRunner taskTracker;
+    taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+    
+    Thread taskTrackerThread = new Thread(taskTracker);
+    taskTrackerList.add(taskTracker);
+    taskTrackerThreadList.add(taskTrackerThread);
+    taskTrackerThread.start();
+    ++numTaskTrackers;
+  }
+  
   /**
    * Get the tasktrackerID in MiniMRCluster with given trackerName.
    */

+ 3 - 4
src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java

@@ -105,10 +105,9 @@ public class TestJobTrackerRestartWithLostTracker extends TestCase {
     UtilsForTests.waitTillDone(jobClient);
 
     // Check if the tasks on the lost tracker got re-executed
-    assertTrue("Tracker killed while the jobtracker was down did not get lost "
-                + "upon restart", 
-                jobClient.getClusterStatus().getTaskTrackers() 
-                < mr.getNumTaskTrackers());
+    assertEquals("Tracker killed while the jobtracker was down did not get lost "
+                 + "upon restart", 
+                 jobClient.getClusterStatus().getTaskTrackers(), 1);
 
     // validate the history file
     TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);

+ 280 - 0
src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java

@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+
+/** 
+ * This test checks jobtracker in safe mode. In safe mode the jobtracker upon 
+ * restart doesnt schedule any new tasks and waits for the (old) trackers to 
+ * join back.
+ */
+public class TestJobTrackerSafeMode extends TestCase {
+  final Path testDir = 
+    new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
+  final Path inDir = new Path(testDir, "input");
+  final Path shareDir = new Path(testDir, "share");
+  final Path outputDir = new Path(testDir, "output");
+  final int numDir = 1;
+  final int numTrackers = 2;
+  
+  private static final Log LOG = 
+    LogFactory.getLog(TestJobTrackerSafeMode.class);
+  
+  private JobConf configureJob(JobConf conf, int maps, int reduces,
+                               String mapSignal, String redSignal) 
+  throws IOException {
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
+        maps, reduces, "test-jobtracker-safemode", 
+        mapSignal, redSignal);
+    return conf;
+  }
+  
+  /**
+   * Tests the jobtracker's safemode. The test is as follows : 
+   *   - starts a cluster with 2 trackers
+   *   - submits a job with large (40) maps to make sure that all the trackers 
+   *     are logged to the job history
+   *   - wait for the job to be 50% done
+   *   - stop the jobtracker
+   *   - wait for the trackers to be done with all the tasks
+   *   - kill a task tracker
+   *   - start the jobtracker
+   *   - start 2 more trackers
+   *   - now check that while all the tracker are detected (or lost) the 
+   *     scheduling window is closed
+   *   - check that after all the trackers are recovered, scheduling is opened 
+   */
+  private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr) 
+  throws IOException {
+    FileSystem fileSys = dfs.getFileSystem();
+    JobConf jobConf = mr.createJobConf();
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
+    
+    // Configure the jobs
+    JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
+      
+    fileSys.delete(shareDir, true);
+    
+    // Submit a master job   
+    JobClient jobClient = new JobClient(job);
+    RunningJob rJob = jobClient.submitJob(job);
+    JobID id = rJob.getID();
+    
+    // wait for the job to be inited
+    mr.initializeJob(id);
+    
+    // Make sure that the master job is 50% completed
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
+           < 0.5f) {
+      LOG.info("Waiting for the job to be 50% done");
+      UtilsForTests.waitFor(100);
+    }
+
+    // Kill the jobtracker
+    mr.stopJobTracker();
+
+    // Enable recovery on restart
+    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+                                      true);
+    
+    // Signal the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+    
+    // Signal the reducers to complete
+    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
+                              redSignalFile);
+    
+    // wait for the tasks to complete at the tracker
+    Set<String> trackers = new HashSet<String>();
+    for (int i = 0 ; i < numTracker; ++i) {
+      TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
+      trackers.add(t.getName());
+      int runningCount = t.getRunningTaskStatuses().size();
+      while (runningCount != 0) {
+        LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
+        UtilsForTests.waitFor(100);
+        runningCount = 0;
+        for (TaskStatus status : t.getRunningTaskStatuses()) {
+          if (status.getIsMap() 
+              && (status.getRunState() == TaskStatus.State.UNASSIGNED 
+                  || status.getRunState() == TaskStatus.State.RUNNING)) {
+            ++runningCount;
+          }
+        }
+      }
+    }
+
+    LOG.info("Trackers have stabilized");
+    
+    // Kill a tasktracker
+    int trackerToKill = --numTracker;
+    TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
+    
+    trackers.remove(t.getName()); // remove this from the set to check
+    
+    Set<String> lostTrackers = new HashSet<String>();
+    lostTrackers.add(t.getName());
+    
+    // get the attempt-id's to ignore
+    // stop the tracker
+    LOG.info("Stopping tracker : " + t.getName());
+    mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
+    mr.stopTaskTracker(trackerToKill);
+
+    // Restart the jobtracker
+    mr.startJobTracker();
+
+    // Wait for the JT to be ready
+    UtilsForTests.waitForJobTracker(jobClient);
+
+    jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+    // Start a tracker
+    LOG.info("Start a new tracker");
+    mr.startTaskTracker(null, null, ++numTracker, numDir);
+    
+    // Start a tracker
+    LOG.info("Start a new tracker");
+    mr.startTaskTracker(null, null, ++numTracker, numDir);
+
+    // Check if the jobs are still running
+    
+    // Wait for the tracker to be lost
+    boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
+    while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
+      assertFalse("JobTracker has opened up scheduling before all the" 
+                  + " trackers were recovered", shouldSchedule);
+      UtilsForTests.waitFor(100);
+      
+      // snapshot jobtracker's scheduling status
+      shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
+    }
+
+    assertTrue("JobTracker hasnt opened up scheduling even all the" 
+               + " trackers were recovered", 
+               jobtracker.recoveryManager.shouldSchedule());
+    
+    assertEquals("Recovery manager is in inconsistent state", 
+                 0, jobtracker.recoveryManager.recoveredTrackers.size());
+    
+    // wait for the job to be complete
+    UtilsForTests.waitTillDone(jobClient);
+  }
+
+  private boolean checkTrackers(JobTracker jobtracker, Set<String> present, 
+                                Set<String> absent) {
+    long jobtrackerRecoveryFinishTime = 
+      jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
+    for (String trackerName : present) {
+      TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
+      // check if the status is present and also the tracker has contacted back
+      // after restart
+      if (status == null 
+          || status.getLastSeen() < jobtrackerRecoveryFinishTime) {
+        return false;
+      }
+    }
+    for (String trackerName : absent) {
+      TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
+      // check if the status is still present
+      if ( status != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Test {@link JobTracker}'s safe mode.
+   */
+  public void testJobTrackerSafeMode() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, null, null);
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      
+      // clean up
+      fileSys.delete(testDir, true);
+      
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      // Write the input file
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                              new Path(inDir + "/file"), (short)1);
+
+      dfs.startDataNodes(conf, 1, true, null, null, null, null);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+                 + (dfs.getFileSystem()).getUri().getPort();
+
+      // Make sure that jobhistory leads to a proper job restart
+      // So keep the blocksize and the buffer size small
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.job.history.block.size", "512");
+      jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
+      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+      jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
+      jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
+      
+      mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
+      
+      // Test Lost tracker case
+      testSafeMode(dfs, mr);
+    } finally {
+      if (mr != null) {
+        try {
+          mr.shutdown();
+        } catch (Exception e) {}
+      }
+      if (dfs != null) {
+        try {
+          dfs.shutdown();
+        } catch (Exception e) {}
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    new TestJobTrackerSafeMode().testJobTrackerSafeMode();
+  }
+}

+ 1 - 2
src/test/org/apache/hadoop/mapred/TestLostTracker.java

@@ -86,8 +86,7 @@ public class TestLostTracker extends TestCase {
     UtilsForTests.waitTillDone(jobClient);
 
     // Check if the tasks on the lost tracker got killed and re-executed
-    assertTrue(jobClient.getClusterStatus().getTaskTrackers() 
-                < mr.getNumTaskTrackers());
+    assertEquals(jobClient.getClusterStatus().getTaskTrackers(), 1);
     assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
                          getTip(taskid.getTaskID());