瀏覽代碼

MAPREDUCE-4749. Fixed a bug in TaskTracker because of which kill-actions get delayed progressively. Contributed by Arpit Gupta.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1407717 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 年之前
父節點
當前提交
9e285887e7

+ 3 - 0
CHANGES.txt

@@ -342,6 +342,9 @@ Release 1.1.1 - Unreleased
     MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
     (Mark Fuhs via bobby)
 
+    MAPREDUCE-4749. Fixed a bug in TaskTracker because of which kill-actions get
+    delayed progressively. (Arpit Gupta via vinodkv)
+
 Release 1.1.0 - 2012.09.28
 
   INCOMPATIBLE CHANGES

+ 103 - 50
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -44,6 +44,7 @@ import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
@@ -440,50 +441,105 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   }
   
   /**
-   * A list of tips that should be cleaned up.
+   * A list of clean-up actions
    */
-  private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
-    new LinkedBlockingQueue<TaskTrackerAction>();
-    
+  // No ConcurrentHashSet :(
+  ConcurrentHashMap<String, String> allCleanupActions = 
+      new ConcurrentHashMap<String, String>();
+  BlockingQueue<TaskTrackerAction> activeCleanupActions = 
+      new LinkedBlockingQueue<TaskTrackerAction>();
+  List<TaskTrackerAction> inactiveCleanupActions = 
+      new ArrayList<TaskTrackerAction>();
+
+  /*
+   * Add action to clean up queue. Check to make sure action is not already
+   * present before adding it to the queue.
+   */
+  void addActionToCleanup(TaskTrackerAction action) throws InterruptedException {
+
+    String actionId = getIdForCleanUpAction(action);
+
+    // add the action to the queue only if its not added in the first place
+    String previousActionId = allCleanupActions.putIfAbsent(actionId, actionId);
+    if (previousActionId != null) {
+      return;
+    } else {
+      activeCleanupActions.put(action);
+    }
+  }
+
+  /*
+   * Get the id for a given clean up action. It is expected to be either a
+   * KillJobAction or KillTaskAction
+   */
+  private String getIdForCleanUpAction(TaskTrackerAction action) {
+    String actionId = null;
+    // get the id of the action
+    if (action instanceof KillJobAction) {
+      actionId = ((KillJobAction) action).getJobID().toString();
+    } else if (action instanceof KillTaskAction) {
+      actionId = ((KillTaskAction) action).getTaskID().toString();
+    }
+    // Assuming actionId is not null as all clean-up actions are either KillJob
+    // or KillTask
+    return actionId;
+  }
+
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              if( !checkJobStatusAndWait(action) ) {
-                //If job is still localizing, put it back into the queue and
-                //pick another one in the next iteration
-                StringBuffer sb = new StringBuffer("Cleanup for job ");
-                if (action instanceof KillJobAction) {
-                  sb.append(((KillJobAction)action).getJobID());
-                } else if (action instanceof KillTaskAction) {
-                  sb.append(((KillTaskAction)action).getTaskID().getJobID() +
-                    ", task " + ((KillTaskAction)action).getTaskID());
-                }
-                sb.append(" was postponed after waiting for 5 seconds.");
-                LOG.info(sb);
-                tasksToCleanup.put(action);
-                continue;
-              }
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                processKillTaskAction((KillTaskAction) action);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
+  private Thread taskCleanupThread = new Thread(new Runnable() {
+    public void run() {
+      while (true) {
+        try {
+          taskCleanUp();
+        } catch (Throwable except) {
+          LOG.warn(StringUtils.stringifyException(except));
         }
-      }, "taskCleanup");
+      }
+    }
+  }, "taskCleanup");
+
+  void taskCleanUp() throws InterruptedException, IOException {
+    // process all the localizing tasks and move to the clean up queue
+    // if the job is not localizing.
+    Iterator<TaskTrackerAction> itr = inactiveCleanupActions.iterator();
+    while (itr.hasNext()) {
+      TaskTrackerAction action = itr.next();
+      if (!isJobLocalizing(action)) {
+        activeCleanupActions.put(action);
+        itr.remove();
+      }
+    }
 
+    // if the tasks to clean is empty better sleep for 2 seconds and
+    // re process
+    if (activeCleanupActions.isEmpty()) {
+      Thread.sleep(2000);
+      return;
+    }
+
+    TaskTrackerAction action = activeCleanupActions.take();
+    String actionId = getIdForCleanUpAction(action);
+    if (isJobLocalizing(action)) {
+      // If job is still localizing, put it back into the queue and
+      // pick another one in the next iteration
+      LOG.info("Cleanup for id " + actionId + " skipped as its localizing.");
+      inactiveCleanupActions.add(action);
+      return;
+    } else {
+      // remove the action from the hash as its being processed.
+      allCleanupActions.remove(actionId);
+    }
+    if (action instanceof KillJobAction) {
+      purgeJob((KillJobAction) action);
+    } else if (action instanceof KillTaskAction) {
+      processKillTaskAction((KillTaskAction) action);
+    } else {
+      LOG.error("Non-delete action given to cleanup thread: " + action);
+    }
+  }
+  
   void processKillTaskAction(KillTaskAction killAction) throws IOException {
     TaskInProgress tip;
     synchronized (TaskTracker.this) {
@@ -494,21 +550,19 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   }
 
   /**
-   * Wait until the job has completed localizing if it was doing so.
+   * Check if the job for the given action is localizing or not.
    * @param action The command received from the JobTracker
-   * @return true if the wait was successful and the task is not localizing
-   * anymore. false if after 5 seconds, the job was still localizing
-   * @throws InterruptedException
+   * @return true if job is localizing and false otherwise.
    */
-  private boolean checkJobStatusAndWait(TaskTrackerAction action)
-  throws InterruptedException {
+  private boolean isJobLocalizing(TaskTrackerAction action)
+  {
     JobID jobId = null;
     if (action instanceof KillJobAction) {
       jobId = ((KillJobAction)action).getJobID();
     } else if (action instanceof KillTaskAction) {
       jobId = ((KillTaskAction)action).getTaskID().getJobID();
     } else {
-      return true;
+      return false;
     }
     RunningJob rjob = null;
     synchronized (runningJobs) {
@@ -516,11 +570,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     }
     if (rjob != null) {
       synchronized (rjob) {
-        rjob.wait(5000);
-        return !rjob.localizing;
+        return rjob.localizing;
       }
     }
-    return true;
+    return false;
   }
 
   public TaskController getTaskController() {
@@ -1819,7 +1872,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
                 commitResponses.add(commitAction.getTaskID());
               }
             } else {
-              tasksToCleanup.put(action);
+              addActionToCleanup(action);
             }
           }
         }
@@ -3841,7 +3894,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
    * @return has this task tracker finished and cleaned up all of its tasks?
    */
   public synchronized boolean isIdleAndClean() {
-    return tasks.isEmpty() && tasksToCleanup.isEmpty();
+    return tasks.isEmpty() && allCleanupActions.isEmpty();
   }
 
   /**

+ 388 - 0
src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java

@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestTaskTrackerActionCleanup extends TestCase {
+  String jtIdentifier = "201210122331";
+  TestTaskTracker tt = null;
+
+  @Before
+  public void setUp() {
+    tt = new TestTaskTracker();
+  }
+
+  @Test
+  // check duplicate entries do not make it to the queue
+  public void testDuplicateEntries() throws InterruptedException {
+
+    KillJobAction action = new KillJobAction();
+    // add the action twice
+    tt.addActionToCleanup(action);
+    tt.addActionToCleanup(action);
+
+    checkItemCountInQueue(tt, 1, 1, 0);
+  }
+
+  @Test
+  // test to make sure all tasks with localizing jobs are added to another queue
+  public void testLocalizingJobActions() throws InterruptedException,
+      IOException {
+
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = true;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the one round the first task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 3, 1);
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 2nd task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 2, 2);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 3rd task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 1, 3);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets moved to localizing list
+    checkItemCountInQueue(tt, 4, 0, 4);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // this is a no-op there is a sleep here but cant really test it
+    checkItemCountInQueue(tt, 4, 0, 4);
+  }
+
+  @Test
+  // test when all active items in the queue
+  public void testAllActiveJobActions() throws InterruptedException,
+      IOException {
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = false;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the one round the first task gets killed
+    checkItemCountInQueue(tt, 3, 3, 0);
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 2nd task gets killed
+    checkItemCountInQueue(tt, 2, 2, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 3rd task gets killed
+    checkItemCountInQueue(tt, 1, 1, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets killed
+    checkItemCountInQueue(tt, 0, 0, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // this is a no-op there is a sleep here but cant really test it
+    checkItemCountInQueue(tt, 0, 0, 0);
+  }
+
+  @Test
+  // test when some items are localizing and some are not in the queue
+  public void testMixedJobActions() throws InterruptedException, IOException {
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = true;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // since the first attempt is localizing it will move to the
+    // localizing queue
+    checkItemCountInQueue(tt, 4, 3, 1);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 2nd round the tip is a job that can be cleaned up
+    checkItemCountInQueue(tt, 3, 2, 1);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 3rd round the 3rd task is getting localized
+    checkItemCountInQueue(tt, 3, 1, 2);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets killed
+    checkItemCountInQueue(tt, 2, 0, 2);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // nothing should change as the tasks are being localized
+    checkItemCountInQueue(tt, 2, 0, 2);
+
+  }
+
+  // test when some items are localizing and some are not in the queue
+  // and we update the status of the actions between clean up jobs
+  @Test
+  public void testMixedJobActionsAndUpdateActions()
+      throws InterruptedException, IOException {
+    // job and attempt ids
+    JobID jobId1 = new JobID(jtIdentifier, 1);
+    JobID jobId2 = new JobID(jtIdentifier, 2);
+
+    TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+        1);
+    TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+        1);
+
+    // job actions which is localizing
+    KillJobAction jAction1 = new KillJobAction(jobId1);
+    RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+    rjob1.localizing = true;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId1, rjob1);
+    tt.addActionToCleanup(jAction1);
+
+    KillJobAction jAction2 = new KillJobAction(jobId2);
+    RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+    rjob2.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(jobId2, rjob2);
+    tt.addActionToCleanup(jAction2);
+
+    // task action which is localizing
+    KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+    RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob3.getJobID(), rjob3);
+    rjob3.localizing = true;
+    tt.addActionToCleanup(tAction1);
+
+    KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+    RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+    rjob4.localizing = false;
+    // add the job to the runningJobs tree
+    tt.runningJobs.put(rjob4.getJobID(), rjob4);
+    tt.addActionToCleanup(tAction2);
+
+    // before the task clean up test the queue
+    checkItemCountInQueue(tt, 4, 4, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // since the first attempt is localizing it will move to the
+    // localizing queue
+    checkItemCountInQueue(tt, 4, 3, 1);
+
+    // before running clean up again change the status of the first attempt to
+    // not be localizing.
+    rjob1.localizing = false;
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 2nd round the tip is a job that can be cleaned up so it is
+    // cleaned and the localizing job is no longer being localized so it moves
+    // to the active queue
+    checkItemCountInQueue(tt, 3, 3, 0);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the 3rd round the 3rd task is getting localized
+    // so gets moved to the localized queue.
+    checkItemCountInQueue(tt, 3, 2, 1);
+
+    // now run the task clean up
+    tt.taskCleanUp();
+
+    // after the another round the 4th task gets killed
+    // and leaves 2 tasks one which switched from localizing to clean
+    // and one that is being localized.
+    checkItemCountInQueue(tt, 2, 1, 1);
+  }
+
+  /*
+   * Method to test the number of items in the various datastructures holding
+   * clean up actions
+   */
+  private void checkItemCountInQueue(TaskTracker tt, int allSize,
+      int activeSize, int inactiveSize) {
+    // check the size and the content of the hashset and queue
+    assertEquals("Size of allCleanUpActions is not " + allSize, allSize,
+        tt.allCleanupActions.size());
+    assertEquals("Size of activeCleanUpActions is not " + activeSize,
+        activeSize, tt.activeCleanupActions.size());
+    assertEquals("Size of inactiveCleanUpActions is not " + inactiveSize,
+        inactiveSize, tt.inactiveCleanupActions.size());
+  }
+
+  class TestTaskTracker extends TaskTracker {
+    // override the method so its a no-op
+    synchronized void purgeJob(KillJobAction action) throws IOException {
+      LOG.info("Received 'KillJobAction' for job: " + action.getJobID());
+    }
+
+    // override the method so its a no-op
+    void processKillTaskAction(KillTaskAction killAction) throws IOException {
+      LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
+    }
+  }
+}