Преглед на файлове

HADOOP-840. Queue task cleanups. Contributed by Owen & Mahadev.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@492681 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting преди 18 години
родител
ревизия
1863c9affc
променени са 2 файла, в които са добавени 100 реда и са изтрити 153 реда
  1. 3 0
      CHANGES.txt
  2. 97 153
      src/java/org/apache/hadoop/mapred/TaskTracker.java

+ 3 - 0
CHANGES.txt

@@ -185,6 +185,9 @@ Trunk (unreleased changes)
     intermediate outputs may happen at any time, potentially causing
     task timeouts.  (Devaraj Das via cutting)
 
+53. HADOOP-840.  In task tracker, queue task cleanups and perform them
+    in a separate thread.  (omalley & Mahadev Konar via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

+ 97 - 153
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -21,7 +21,6 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -29,6 +28,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
 
 import javax.servlet.ServletContext;
@@ -77,7 +78,7 @@ public class TaskTracker
     
     boolean shuttingDown = false;
     
-    Map<String, TaskInProgress> tasks = null;
+    Map<String, TaskInProgress> tasks = new HashMap();
     /**
      * Map from taskId -> TaskInProgress.
      */
@@ -134,7 +135,8 @@ public class TaskTracker
     /**
      * A list of tips that should be cleaned up.
      */
-    private BlockingQueue tasksToCleanup = new BlockingQueue();
+    private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
+      new LinkedBlockingQueue();
     
     /**
      * A daemon-thread that pulls tips off the list of things to cleanup.
@@ -144,8 +146,22 @@ public class TaskTracker
         public void run() {
           while (true) {
             try {
-              TaskInProgress tip = (TaskInProgress) tasksToCleanup.take();
-              tip.jobHasFinished();
+              TaskTrackerAction action = tasksToCleanup.take();
+              if (action instanceof KillJobAction) {
+                purgeJob((KillJobAction) action);
+              } else if (action instanceof KillTaskAction) {
+                TaskInProgress tip;
+                KillTaskAction killAction = (KillTaskAction) action;
+                synchronized (TaskTracker.this) {
+                  tip = tasks.get(killAction.getTaskId());
+                }
+                LOG.info("Received KillTaskAction for task: " + 
+                         killAction.getTaskId());
+                purgeTask(tip);
+              } else {
+                LOG.error("Non-delete action given to cleanup thread: "
+                          + action);
+              }
             } catch (Throwable except) {
               LOG.warn(StringUtils.stringifyException(except));
             }
@@ -163,7 +179,7 @@ public class TaskTracker
       synchronized (runningJobs) {
         RunningJob rJob = null;
         if (!runningJobs.containsKey(jobId)) {
-          rJob = new RunningJob(localJobFile);
+          rJob = new RunningJob(jobId, localJobFile);
           rJob.localized = false;
           rJob.tasks = new HashSet();
           rJob.jobFile = localJobFile;
@@ -227,7 +243,7 @@ public class TaskTracker
         fConf.deleteLocalFiles(SUBDIR);
 
         // Clear out state tables
-        this.tasks = new TreeMap();
+        this.tasks.clear();
         this.runningTasks = new TreeMap();
         this.runningJobs = new TreeMap();
         this.mapTotal = 0;
@@ -324,6 +340,8 @@ public class TaskTracker
             }
             RunJar.unJar(new File(localJarFile.toString()), workDir);
           }
+          rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+				 localJobConf.getKeepFailedTaskFiles());
           rjob.localized = true;
         }
       }
@@ -475,10 +493,16 @@ public class TaskTracker
             
             lastHeartbeat = now;
             justStarted = false;
-
-            checkAndStartNewTasks(actions);
+            if (actions != null){ 
+              for(TaskTrackerAction action: actions) {
+                if (action instanceof LaunchTaskAction) {
+                  startNewTask((LaunchTaskAction) action);
+                } else {
+                  tasksToCleanup.put(action);
+                }
+              }
+            }
             markUnresponsiveTasks();
-            closeCompletedTasks(actions);
             killOverflowingTasks();
             
             //we've cleaned up, resume normal operation
@@ -583,28 +607,6 @@ public class TaskTracker
       return false;
     }
     
-    /**
-     * Check to see if there are any new tasks that we should run.
-     * @throws IOException
-     */
-    private void checkAndStartNewTasks(TaskTrackerAction[] actions) 
-    throws IOException {
-      if (actions == null) {
-        return;
-      }
-      
-      for (TaskTrackerAction action : actions) {
-        if (action.getActionId() == 
-          TaskTrackerAction.ActionType.LAUNCH_TASK) {
-          Task t = ((LaunchTaskAction)(action)).getTask();
-          LOG.info("LaunchTaskAction: " + t.getTaskId());
-          if (t != null) {
-            startNewTask(t);
-          }
-        }
-      }
-    }
-    
     /**
      * Kill any tasks that have not reported progress in the last X seconds.
      */
@@ -621,59 +623,48 @@ public class TaskTracker
                 LOG.info(tip.getTask().getTaskId() + ": " + msg);
                 ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
                 tip.reportDiagnosticInfo(msg);
-                purgeTask(tip, false);
+                purgeTask(tip);
             }
         }
     }
 
     /**
-     * Ask the JobTracker if there are any tasks that we should clean up,
-     * either because we don't need them any more or because the job is done.
+     * The task tracker is done with this job, so we need to clean up.
+     * @param action The action with the job
+     * @throws IOException
      */
-    private void closeCompletedTasks(TaskTrackerAction[] actions) 
-    throws IOException {
-      if (actions == null) {
-        return;
+    private void purgeJob(KillJobAction action) throws IOException {
+      String jobId = action.getJobId();
+      LOG.info("Received 'KillJobAction' for job: " + jobId);
+      RunningJob rjob = null;
+      synchronized (runningJobs) {
+        rjob = runningJobs.get(jobId);
       }
       
-      for (TaskTrackerAction action : actions) {
-        TaskTrackerAction.ActionType actionType = action.getActionId();
-        
-        if (actionType == TaskTrackerAction.ActionType.KILL_JOB) {
-          String jobId = ((KillJobAction)action).getJobId();
-          LOG.info("Received 'KillJobAction' for job: " + jobId);
-          synchronized (runningJobs) {
-            RunningJob rjob = runningJobs.get(jobId);
-            if (rjob == null) {
-              LOG.warn("Unknown job " + jobId + " being deleted.");
-            } else {
-              synchronized (rjob) {
-                int noJobTasks = rjob.tasks.size(); 
-                int taskCtr = 0;
-                
-                // Add this tips of this job to queue of tasks to be purged 
-                for (TaskInProgress tip : rjob.tasks) {
-                  // Purge the job files for the last element in rjob.tasks
-                  if (++taskCtr == noJobTasks) {
-                    tip.setPurgeJobFiles(true);
-                  }
-
-                  tasksToCleanup.put(tip);
-                }
-                
-                // Remove this job 
-                rjob.tasks.clear();
-                runningJobs.remove(jobId);
-              }
-            }
+      if (rjob == null) {
+        LOG.warn("Unknown job " + jobId + " being deleted.");
+      } else {
+        synchronized (rjob) {            
+          // Add this tips of this job to queue of tasks to be purged 
+          for (TaskInProgress tip : rjob.tasks) {
+            tip.jobHasFinished();
           }
-        } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) {
-          String taskId = ((KillTaskAction)action).getTaskId();
-          LOG.info("Received KillTaskAction for task: " + taskId);
-          purgeTask(tasks.get(taskId), false);
+          // Delete the job directory for this  
+          // task if the job is done/failed
+          if (!rjob.keepJobFiles){
+            fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + 
+                                   Path.SEPARATOR +  rjob.getJobId());
+	  }
+          // Remove this job 
+          rjob.tasks.clear();
         }
       }
-    }
+
+      synchronized(runningJobs) {
+        runningJobs.remove(jobId);
+      }
+    }      
+    
     
     /**
      * Remove the tip and update all relevant state.
@@ -682,19 +673,14 @@ public class TaskTracker
      * @param purgeJobFiles <code>true</code> if the job files are to be
      *                      purged, <code>false</code> otherwise.
      */
-    private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) {
+    private void purgeTask(TaskInProgress tip) throws IOException {
       if (tip != null) {
         LOG.info("About to purge task: " + tip.getTask().getTaskId());
         
-        // Cleanup the job files? 
-        tip.setPurgeJobFiles(purgeJobFiles);
-        
         // Remove the task from running jobs, 
         // removing the job if it's the last task
         removeTaskFromJob(tip.getTask().getJobId(), tip);
-        
-        // Add this tip to queue of tasks to be purged 
-        tasksToCleanup.put(tip);
+        tip.jobHasFinished();
       }
     }
 
@@ -718,7 +704,7 @@ public class TaskTracker
                          " Killing task.";
             LOG.info(killMe.getTask().getTaskId() + ": " + msg);
             killMe.reportDiagnosticInfo(msg);
-            purgeTask(killMe, false);
+            purgeTask(killMe);
           }
         }
       }
@@ -793,7 +779,9 @@ public class TaskTracker
      * All exceptions are handled locally, so that we don't mess up the
      * task tracker.
      */
-    private void startNewTask(Task t) {
+    private void startNewTask(LaunchTaskAction action) {
+      Task t = action.getTask();
+      LOG.info("LaunchTaskAction: " + t.getTaskId());
       TaskInProgress tip = new TaskInProgress(t, this.fConf);
       synchronized (this) {
         tasks.put(t.getTaskId(), tip);
@@ -862,51 +850,6 @@ public class TaskTracker
             return;
         }
     }
-
-    /**
-     * This class implements a queue that is put between producer and 
-     * consumer threads. It will grow without bound.
-     * @author Owen O'Malley
-     */
-    static private class BlockingQueue {
-      private List queue;
-      
-      /**
-       * Create an empty queue.
-       */
-      public BlockingQueue() {
-        queue = new ArrayList();
-      }
-       
-      /**
-       * Put the given object at the back of the queue.
-       * @param obj
-       */
-      public void put(Object obj) {
-        synchronized (queue) {
-          queue.add(obj);
-          queue.notify();
-        }
-      }
-      
-      /**
-       * Take the object at the front of the queue.
-       * It blocks until there is an object available.
-       * @return the head of the queue
-       */
-      public Object take() {
-        synchronized (queue) {
-          while (queue.isEmpty()) {
-            try {
-              queue.wait();
-            } catch (InterruptedException ie) {}
-          }
-          Object result = queue.get(0);
-          queue.remove(0);
-          return result;
-        }
-      }
-    }
     
     ///////////////////////////////////////////////////////
     // TaskInProgress maintains all the info for a Task that
@@ -929,9 +872,6 @@ public class TaskTracker
         private TaskStatus taskStatus ; 
         private boolean keepJobFiles;
         
-        /** Cleanup the job files when the job is complete (done/failed) */
-        private boolean purgeJobFiles = false;
-
         /**
          */
         public TaskInProgress(Task task, JobConf conf) {
@@ -995,10 +935,6 @@ public class TaskTracker
             keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
         }
         
-        public void setPurgeJobFiles(boolean purgeJobFiles) {
-          this.purgeJobFiles = purgeJobFiles;
-        }
-        
         /**
          */
         public synchronized TaskStatus createStatus() {
@@ -1158,12 +1094,6 @@ public class TaskTracker
                 LOG.warn("Error in deleting reduce temporary output",e); 
               }
             }
-            // Delete the job directory for this  
-            // task if the job is done/failed
-            if (purgeJobFiles) {
-              this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
-                      JOBCACHE + Path.SEPARATOR +  task.getJobId());
-            }
         }
 
         /**
@@ -1225,17 +1155,20 @@ public class TaskTracker
                        keepFailedTaskFiles)) {
                  return;
                }
-               synchronized (this) {
-                 try {
-                    runner.close();
-                 } catch (Throwable ie) {
-                 }
-               }
             }
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
-                    JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
-                    taskId);
+            synchronized (this) {
+              try {
+                runner.close();
+                defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                                                JOBCACHE + Path.SEPARATOR + 
+                                                task.getJobId() + 
+                                                Path.SEPARATOR + taskId);
+              } catch (Throwable ie) {
+                LOG.info("Error cleaning up task runner: " + 
+                         StringUtils.stringifyException(ie));
+              }
             }
+        }
         
         public boolean equals(Object obj) {
           return (obj instanceof TaskInProgress) &&
@@ -1355,15 +1288,26 @@ public class TaskTracker
      *  The datastructure for initializing a job
      */
     static class RunningJob{
-      Path jobFile;
+      private String jobid; 
+      private Path jobFile;
       // keep this for later use
       Set<TaskInProgress> tasks;
       boolean localized;
-      
-      RunningJob(Path jobFile) {
+      boolean keepJobFiles;
+      RunningJob(String jobid, Path jobFile) {
+        this.jobid = jobid;
         localized = false;
         tasks = new HashSet();
         this.jobFile = jobFile;
+        keepJobFiles = false;
+      }
+      
+      Path getJobFile() {
+        return jobFile;
+      }
+      
+      String getJobId() {
+        return jobid;
       }
     }