Ver código fonte

HADOOP-2393. Moves the handling of dir deletions in the tasktracker to a separate thread. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663886 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 anos atrás
pai
commit
ab1011f933
2 arquivos alterados com 102 adições e 8 exclusões
  1. 3 0
      CHANGES.txt
  2. 99 8
      src/java/org/apache/hadoop/mapred/TaskTracker.java

+ 3 - 0
CHANGES.txt

@@ -242,6 +242,9 @@ Trunk (unreleased changes)
     from shuffle threads when it has scheduled enough, before scheduling more.
     (ddas)
 
+    HADOOP-2393. Moves the handling of dir deletions in the tasktracker to
+    a separate thread. (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 99 - 8
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -123,7 +123,7 @@ public class TaskTracker
     
   StatusHttpServer server = null;
     
-  boolean shuttingDown = false;
+  volatile boolean shuttingDown = false;
     
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
   /**
@@ -159,6 +159,7 @@ public class TaskTracker
   private int finishedCount[] = new int[1];
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
+  private CleanupQueue directoryCleanupThread;
   /**
    * the minimum interval between jobtracker polls
    */
@@ -296,6 +297,9 @@ public class TaskTracker
         public void run() {
           while (true) {
             try {
+              if (tasksToCleanup.isEmpty() && !isRunning()) {
+                break;
+              }
               TaskTrackerAction action = tasksToCleanup.take();
               if (action instanceof KillJobAction) {
                 purgeJob((KillJobAction) action);
@@ -394,9 +398,12 @@ public class TaskTracker
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
     }
  
+    directoryCleanupThread = new CleanupQueue(fConf);
+    directoryCleanupThread.start();
+
     //check local disk
     checkLocalDirs(this.fConf.getLocalDirs());
-    fConf.deleteLocalFiles(SUBDIR);
+    directoryCleanupThread.addToQueue(getLocalFiles(fConf, SUBDIR));
 
     // Clear out state tables
     this.tasks.clear();
@@ -793,6 +800,28 @@ public class TaskTracker
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     
+    // shutdown cleanup threads.
+    if (this.taskCleanupThread != null 
+         && this.taskCleanupThread.isAlive()) {
+      LOG.info("Stopping task cleanup thread");
+      this.taskCleanupThread.interrupt();
+      try {
+        this.taskCleanupThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+    if (this.directoryCleanupThread != null 
+         && this.directoryCleanupThread.isAlive()) {
+      LOG.info("Stopping directory cleanup thread");
+      this.directoryCleanupThread.interrupt();
+      try {
+        this.directoryCleanupThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
   }
@@ -1149,8 +1178,9 @@ public class TaskTracker
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + 
-                                 Path.SEPARATOR +  rjob.getJobID());
+          directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
+                                   SUBDIR + Path.SEPARATOR + JOBCACHE + 
+                                   Path.SEPARATOR +  rjob.getJobID()));
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1925,10 +1955,11 @@ public class TaskTracker
             if (runner != null) {
               runner.close();
             }
-            defaultJobConf.deleteLocalFiles(taskDir);
+            directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                                                            taskDir));
           } else {
-            defaultJobConf.deleteLocalFiles(taskDir + Path.SEPARATOR + 
-                                            MRConstants.WORKDIR);
+            directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                           taskDir + Path.SEPARATOR + MRConstants.WORKDIR));
           }
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: " + 
@@ -2313,7 +2344,15 @@ public class TaskTracker
       System.exit(-1);
     }
   }
-    
+
+  /**
+   * True if task tracker is not shutting down.
+   * @return running
+   */
+  public boolean isRunning() {
+    return !shuttingDown;
+  }
+  
   /**
    * This class is used in TaskTracker's Jetty to serve the map outputs
    * to other nodes.
@@ -2456,4 +2495,56 @@ public class TaskTracker
       shuffleMetrics.successOutput();
     }
   }
+
+  // get the full paths of the directory in all the local disks.
+  private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
+    String[] localDirs = conf.getLocalDirs();
+    Path[] paths = new Path[localDirs.length];
+    FileSystem localFs = FileSystem.getLocal(conf);
+    for (int i = 0; i < localDirs.length; i++) {
+      paths[i] = new Path(localDirs[i], subdir);
+      paths[i] = paths[i].makeQualified(localFs);
+    }
+    return paths;
+  }
+
+  // cleanup queue which deletes files/directories of the paths queued up.
+  private class CleanupQueue extends Thread {
+    private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
+    private JobConf conf;
+    
+    public CleanupQueue(JobConf conf) throws IOException{
+      setName("Directory/File cleanup thread");
+      setDaemon(true);
+      this.conf = conf;
+    }
+
+    public void addToQueue(Path... paths) {
+      for (Path p : paths) {
+        try {
+          queue.put(p);
+        } catch (InterruptedException ie) {}
+      }
+      return;
+    }
+
+    public void run() {
+      LOG.debug("cleanup thread started");
+      Path path = null;
+      while (true) {
+        try {
+          if (queue.isEmpty() && !isRunning()) {
+            break;
+          }
+          path = queue.take();
+          // delete the path.
+          FileSystem fs = path.getFileSystem(conf);
+          fs.delete(path, true);
+        } catch (IOException e) {
+          LOG.info("Error deleting path" + path);
+        } catch (InterruptedException t) {
+        }
+      }
+    }
+  }
 }