Browse Source

svn merge -c 1391671 FIXES: MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job has completed. Contributed by Robert Joseph Evans.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1391677 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 years ago
parent
commit
ee7192ac07

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -35,6 +35,9 @@ Release 0.23.4 - UNRELEASED
     MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly
     MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly
     to clients in case of failed jobs also. (Jason Lowe via vinodkv)
     to clients in case of failed jobs also. (Jason Lowe via vinodkv)
 
 
+    MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job
+    has completed (Robert Joseph Evans via jlowe)
+
 Release 0.23.3 - UNRELEASED
 Release 0.23.3 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 35 - 19
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java

@@ -23,14 +23,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
@@ -77,7 +77,7 @@ public class HistoryFileManager extends AbstractService {
   private static enum HistoryInfoState {
   private static enum HistoryInfoState {
     IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
     IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
   };
   };
-
+  
   private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
   private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
       .doneSubdirsBeforeSerialTail();
       .doneSubdirsBeforeSerialTail();
 
 
@@ -199,6 +199,29 @@ public class HistoryFileManager extends AbstractService {
     }
     }
   }
   }
 
 
+  /**
+   * This class represents a user dir in the intermediate done directory.  This
+   * is mostly for locking purposes. 
+   */
+  private class UserLogDir {
+    long modTime = 0;
+    
+    public synchronized void scanIfNeeded(FileStatus fs) {
+      long newModTime = fs.getModificationTime();
+      if (modTime != newModTime) {
+        Path p = fs.getPath();
+        try {
+          scanIntermediateDirectory(p);
+          //If scanning fails, we will scan again.  We assume the failure is
+          // temporary.
+          modTime = newModTime;
+        } catch (IOException e) {
+          LOG.error("Error while trying to scan the directory " + p, e);
+        }
+      }
+    }
+  }
+  
   public class HistoryFileInfo {
   public class HistoryFileInfo {
     private Path historyFile;
     private Path historyFile;
     private Path confFile;
     private Path confFile;
@@ -352,7 +375,8 @@ public class HistoryFileManager extends AbstractService {
    * Maintains a mapping between intermediate user directories and the last
    * Maintains a mapping between intermediate user directories and the last
    * known modification time.
    * known modification time.
    */
    */
-  private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
+  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
+    new ConcurrentHashMap<String, UserLogDir>();
 
 
   private JobACLsManager aclsMgr;
   private JobACLsManager aclsMgr;
 
 
@@ -586,23 +610,15 @@ public class HistoryFileManager extends AbstractService {
 
 
     for (FileStatus userDir : userDirList) {
     for (FileStatus userDir : userDirList) {
       String name = userDir.getPath().getName();
       String name = userDir.getPath().getName();
-      long newModificationTime = userDir.getModificationTime();
-      boolean shouldScan = false;
-      synchronized (userDirModificationTimeMap) {
-        if (!userDirModificationTimeMap.containsKey(name)
-            || newModificationTime > userDirModificationTimeMap.get(name)) {
-          shouldScan = true;
-          userDirModificationTimeMap.put(name, newModificationTime);
-        }
-      }
-      if (shouldScan) {
-        try {
-          scanIntermediateDirectory(userDir.getPath());
-        } catch (IOException e) {
-          LOG.error("Error while trying to scan the directory " 
-              + userDir.getPath(), e);
+      UserLogDir dir = userDirModificationTimeMap.get(name);
+      if(dir == null) {
+        dir = new UserLogDir();
+        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
+        if(old != null) {
+          dir = old;
         }
         }
       }
       }
+      dir.scanIfNeeded(userDir);
     }
     }
   }
   }