Browse Source

HADOOP-3546. TaskTracker re-initialization gets stuck in cleaning up. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@669547 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
18b39a712b
2 changed files with 15 additions and 46 deletions
  1. 3 0
      CHANGES.txt
  2. 12 46
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java

+ 3 - 0
CHANGES.txt

@@ -628,6 +628,9 @@ Release 0.18.0 - Unreleased
     HADOOP-3534. Log IOExceptions that happen in closing the name
     HADOOP-3534. Log IOExceptions that happen in closing the name
     system when the NameNode shuts down. (Tsz Wo (Nicholas) Sze via omalley)
     system when the NameNode shuts down. (Tsz Wo (Nicholas) Sze via omalley)
 
 
+    HADOOP-3546. TaskTracker re-initialization gets stuck in cleaning up.
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.17.1 - Unreleased
 Release 0.17.1 - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 12 - 46
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -297,9 +297,6 @@ public class TaskTracker
         public void run() {
         public void run() {
           while (true) {
           while (true) {
             try {
             try {
-              if (tasksToCleanup.isEmpty() && !isRunning()) {
-                break;
-              }
               TaskTrackerAction action = tasksToCleanup.take();
               TaskTrackerAction action = tasksToCleanup.take();
               if (action instanceof KillJobAction) {
               if (action instanceof KillJobAction) {
                 purgeJob((KillJobAction) action);
                 purgeJob((KillJobAction) action);
@@ -322,10 +319,6 @@ public class TaskTracker
           }
           }
         }
         }
       }, "taskCleanup");
       }, "taskCleanup");
-  {
-    taskCleanupThread.setDaemon(true);
-    taskCleanupThread.start();
-  }
     
     
   private RunningJob addTaskToJob(JobID jobId, 
   private RunningJob addTaskToJob(JobID jobId, 
                                   Path localJobFile,
                                   Path localJobFile,
@@ -398,12 +391,9 @@ public class TaskTracker
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
     }
     }
  
  
-    directoryCleanupThread = new CleanupQueue(fConf);
-    directoryCleanupThread.start();
-
     //check local disk
     //check local disk
     checkLocalDirs(this.fConf.getLocalDirs());
     checkLocalDirs(this.fConf.getLocalDirs());
-    directoryCleanupThread.addToQueue(getLocalFiles(fConf, SUBDIR));
+    fConf.deleteLocalFiles(SUBDIR);
 
 
     // Clear out state tables
     // Clear out state tables
     this.tasks.clear();
     this.tasks.clear();
@@ -458,7 +448,6 @@ public class TaskTracker
                        InterTrackerProtocol.versionID, 
                        InterTrackerProtocol.versionID, 
                        jobTrackAddr, this.fConf);
                        jobTrackAddr, this.fConf);
         
         
-    this.running = true;
     // start the thread that will fetch map task completion events
     // start the thread that will fetch map task completion events
     this.mapEventsFetcher = new MapEventsFetcherThread();
     this.mapEventsFetcher = new MapEventsFetcherThread();
     mapEventsFetcher.setDaemon(true);
     mapEventsFetcher.setDaemon(true);
@@ -801,28 +790,6 @@ public class TaskTracker
     // Shutdown the fetcher thread
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     this.mapEventsFetcher.interrupt();
     
     
-    // shutdown cleanup threads.
-    if (this.taskCleanupThread != null 
-         && this.taskCleanupThread.isAlive()) {
-      LOG.info("Stopping task cleanup thread");
-      this.taskCleanupThread.interrupt();
-      try {
-        this.taskCleanupThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.directoryCleanupThread != null 
-         && this.directoryCleanupThread.isAlive()) {
-      LOG.info("Stopping directory cleanup thread");
-      this.directoryCleanupThread.interrupt();
-      try {
-        this.directoryCleanupThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-
     // shutdown RPC connections
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
     RPC.stopProxy(jobClient);
   }
   }
@@ -867,6 +834,14 @@ public class TaskTracker
     initialize();
     initialize();
   }
   }
 
 
+  private void startCleanupThreads() throws IOException {
+    taskCleanupThread.setDaemon(true);
+    taskCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue(originalConf);
+    directoryCleanupThread.setDaemon(true);
+    directoryCleanupThread.start();
+  }
+  
   /**
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
    * for locating remote files.
@@ -1355,6 +1330,8 @@ public class TaskTracker
    */
    */
   public void run() {
   public void run() {
     try {
     try {
+      startCleanupThreads();
+      this.running = true;
       boolean denied = false;
       boolean denied = false;
       while (running && !shuttingDown && !denied) {
       while (running && !shuttingDown && !denied) {
         boolean staleState = false;
         boolean staleState = false;
@@ -2346,14 +2323,6 @@ public class TaskTracker
     }
     }
   }
   }
 
 
-  /**
-   * True if task tracker is not shutting down.
-   * @return running
-   */
-  public boolean isRunning() {
-    return !shuttingDown;
-  }
-  
   /**
   /**
    * This class is used in TaskTracker's Jetty to serve the map outputs
    * This class is used in TaskTracker's Jetty to serve the map outputs
    * to other nodes.
    * to other nodes.
@@ -2510,7 +2479,7 @@ public class TaskTracker
   }
   }
 
 
   // cleanup queue which deletes files/directories of the paths queued up.
   // cleanup queue which deletes files/directories of the paths queued up.
-  private class CleanupQueue extends Thread {
+  private static class CleanupQueue extends Thread {
     private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
     private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
     private JobConf conf;
     private JobConf conf;
     
     
@@ -2534,9 +2503,6 @@ public class TaskTracker
       Path path = null;
       Path path = null;
       while (true) {
       while (true) {
         try {
         try {
-          if (queue.isEmpty() && !isRunning()) {
-            break;
-          }
           path = queue.take();
           path = queue.take();
           // delete the path.
           // delete the path.
           FileSystem fs = path.getFileSystem(conf);
           FileSystem fs = path.getFileSystem(conf);