浏览代码

Merge -r 746901:746902 and 746902:746903 from trunk onto 0.20 branch. Fixes HADOOP-5285.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@746909 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父节点
当前提交
a27888218f

+ 7 - 0
CHANGES.txt

@@ -638,6 +638,13 @@ Release 0.20.0 - Unreleased
     
     HADOOP-5292. Fix NPE in KFS::getBlockLocations. (Sriram Rao via lohit)    
 
+    HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is
+    inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class
+    outside the TaskTracker and makes it a generic class that is used by the 
+    JobTracker also for deleting the paths on the job's output fs. (3) Moves the
+    references to completedJobStore outside the block where the JobTracker is locked.
+    (ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 101 - 0
src/mapred/org/apache/hadoop/mapred/CleanupQueue.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+class CleanupQueue {
+
+  public static final Log LOG =
+    LogFactory.getLog(CleanupQueue.class);
+
+  private static PathCleanupThread cleanupThread;
+
+  /**
+   * Create a singleton path-clean-up queue. It can be used to delete
+   * paths(directories/files) in a separate thread. This constructor creates a
+   * clean-up thread and also starts it as a daemon. Callers can instantiate one
+   * CleanupQueue per JVM and can use it for deleting paths. Use
+   * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
+   * deletion.
+   */
+  public CleanupQueue() {
+    synchronized (PathCleanupThread.class) {
+      if (cleanupThread == null) {
+        cleanupThread = new PathCleanupThread();
+      }
+    }
+  }
+  
+  public void addToQueue(JobConf conf, Path...paths) {
+    cleanupThread.addToQueue(conf,paths);
+  }
+
+  private static class PathCleanupThread extends Thread {
+
+    static class PathAndConf {
+      JobConf conf;
+      Path path;
+      PathAndConf(JobConf conf, Path path) {
+        this.conf = conf;
+        this.path = path;
+      }
+    }
+    // cleanup queue which deletes files/directories of the paths queued up.
+    private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
+
+    public PathCleanupThread() {
+      setName("Directory/File cleanup thread");
+      setDaemon(true);
+      start();
+    }
+
+    public void addToQueue(JobConf conf,Path... paths) {
+      for (Path p : paths) {
+        try {
+          queue.put(new PathAndConf(conf,p));
+        } catch (InterruptedException ie) {}
+      }
+    }
+
+    public void run() {
+      LOG.debug(getName() + " started.");
+      PathAndConf pathAndConf = null;
+      while (true) {
+        try {
+          pathAndConf = queue.take();
+          // delete the path.
+          FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
+          fs.delete(pathAndConf.path, true);
+          LOG.debug("DELETED " + pathAndConf.path);
+        } catch (IOException e) {
+          LOG.warn("Error deleting path" + pathAndConf.path);
+        } catch (InterruptedException t) {
+        }
+      }
+    }
+  }
+}

+ 29 - 27
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -978,35 +978,39 @@ class JobInProgress {
   /*
    * Return task cleanup attempt if any, to run on a given tracker
    */
-  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
                                                  boolean isMapSlot)
   throws IOException {
-    if (this.status.getRunState() != JobStatus.RUNNING || 
-        jobFailed || jobKilled) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!tasksInited.get()) {
       return null;
     }
-    TaskAttemptID taskid = null;
-    TaskInProgress tip = null;
-    if (isMapSlot) {
-      if (!mapCleanupTasks.isEmpty()) {
-        taskid = mapCleanupTasks.remove(0);
-        tip = maps[taskid.getTaskID().getId()];
+    synchronized (this) {
+      if (this.status.getRunState() != JobStatus.RUNNING || 
+          jobFailed || jobKilled) {
+        return null;
       }
-    } else {
-      if (!reduceCleanupTasks.isEmpty()) {
-        taskid = reduceCleanupTasks.remove(0);
-        tip = reduces[taskid.getTaskID().getId()];
+      String taskTracker = tts.getTrackerName();
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
       }
+      TaskAttemptID taskid = null;
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        if (!mapCleanupTasks.isEmpty()) {
+          taskid = mapCleanupTasks.remove(0);
+          tip = maps[taskid.getTaskID().getId()];
+        }
+      } else {
+        if (!reduceCleanupTasks.isEmpty()) {
+          taskid = reduceCleanupTasks.remove(0);
+          tip = reduces[taskid.getTaskID().getId()];
+        }
+      }
+      if (tip != null) {
+        return tip.addRunningTask(taskid, taskTracker, true);
+      }
+      return null;
     }
-    if (tip != null) {
-      return tip.addRunningTask(taskid, taskTracker, true);
-    }
-    return null;
   }
   
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
@@ -1111,9 +1115,6 @@ class JobInProgress {
    * @return true/false
    */
   private synchronized boolean canLaunchJobCleanupTask() {
-    if (!tasksInited.get()) {
-      return false;
-    }
     // check if the job is running
     if (status.getRunState() != JobStatus.RUNNING &&
         status.getRunState() != JobStatus.PREP) {
@@ -2444,6 +2445,7 @@ class JobInProgress {
    */
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
+    jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
       
     try {
@@ -2467,8 +2469,7 @@ class JobInProgress {
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
-      FileSystem fs = tempDir.getFileSystem(conf);
-      fs.delete(tempDir, true); 
+      new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
@@ -2479,6 +2480,7 @@ class JobInProgress {
     this.runningMapCache = null;
     this.nonRunningReduces = null;
     this.runningReduces = null;
+
   }
 
   /**

+ 35 - 32
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1823,9 +1823,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   synchronized void finalizeJob(JobInProgress job) {
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
-
-    //persists the job info in DFS
-    completedJobStatusStore.store(job);
     
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
@@ -2856,34 +2853,41 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     setJobPriority(jobid, newPriority);
   }
                            
-  public synchronized JobProfile getJobProfile(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getProfile();
-    } else {
-      return completedJobStatusStore.readJobProfile(jobid);
+  void storeCompletedJob(JobInProgress job) {
+    //persists the job info in DFS
+    completedJobStatusStore.store(job);
+  }
+
+  public JobProfile getJobProfile(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getProfile();
+      } 
     }
+    return completedJobStatusStore.readJobProfile(jobid);
   }
-  public synchronized JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
     }
-    
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getStatus();
-    } else {
-      return completedJobStatusStore.readJobStatus(jobid);
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getStatus();
+      } 
     }
+    return completedJobStatusStore.readJobStatus(jobid);
   }
-  public synchronized Counters getJobCounters(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getCounters();
-    } else {
-      return completedJobStatusStore.readCounters(jobid);
+  public Counters getJobCounters(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getCounters();
+      } 
     }
+    return completedJobStatusStore.readCounters(jobid);
   }
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
@@ -2981,18 +2985,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events = EMPTY_EVENTS;
-
-    JobInProgress job = this.jobs.get(jobid);
-    if (null != job) {
-      if (job.inited()) {
-        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+    synchronized (this) {
+      JobInProgress job = this.jobs.get(jobid);
+      if (null != job) {
+        if (job.inited()) {
+          return job.getTaskCompletionEvents(fromEventId, maxEvents);
+        } else {
+          return EMPTY_EVENTS;
+        }
       }
     }
-    else {
-      events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
-    }
-    return events;
+    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
   }
 
   /**

+ 8 - 44
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -972,9 +972,7 @@ public class TaskTracker
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue(originalConf);
-    directoryCleanupThread.setDaemon(true);
-    directoryCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue();
   }
   
   /**
@@ -1422,7 +1420,7 @@ public class TaskTracker
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
@@ -2487,17 +2485,20 @@ public class TaskTracker
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir));
             }  
             
             else {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
             }  
           }
@@ -3026,43 +3027,6 @@ public class TaskTracker
     return paths;
   }
 
-  // cleanup queue which deletes files/directories of the paths queued up.
-  private static class CleanupQueue extends Thread {
-    private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
-    private JobConf conf;
-    
-    public CleanupQueue(JobConf conf) throws IOException{
-      setName("Directory/File cleanup thread");
-      setDaemon(true);
-      this.conf = conf;
-    }
-
-    public void addToQueue(Path... paths) {
-      for (Path p : paths) {
-        try {
-          queue.put(p);
-        } catch (InterruptedException ie) {}
-      }
-      return;
-    }
-
-    public void run() {
-      LOG.debug("cleanup thread started");
-      Path path = null;
-      while (true) {
-        try {
-          path = queue.take();
-          // delete the path.
-          FileSystem fs = path.getFileSystem(conf);
-          fs.delete(path, true);
-        } catch (IOException e) {
-          LOG.info("Error deleting path" + path);
-        } catch (InterruptedException t) {
-        }
-      }
-    }
-  }
-
   int getMaxCurrentMapTasks() {
     return maxCurrentMapTasks;
   }