|
@@ -23,14 +23,14 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
@@ -77,7 +77,7 @@ public class HistoryFileManager extends AbstractService {
|
|
|
private static enum HistoryInfoState {
|
|
|
IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
|
|
|
};
|
|
|
-
|
|
|
+
|
|
|
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
|
|
|
.doneSubdirsBeforeSerialTail();
|
|
|
|
|
@@ -199,6 +199,29 @@ public class HistoryFileManager extends AbstractService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This class represents a user dir in the intermediate done directory. This
|
|
|
+ * is mostly for locking purposes.
|
|
|
+ */
|
|
|
+ private class UserLogDir {
|
|
|
+ long modTime = 0;
|
|
|
+
|
|
|
+ public synchronized void scanIfNeeded(FileStatus fs) {
|
|
|
+ long newModTime = fs.getModificationTime();
|
|
|
+ if (modTime != newModTime) {
|
|
|
+ Path p = fs.getPath();
|
|
|
+ try {
|
|
|
+ scanIntermediateDirectory(p);
|
|
|
+ //If scanning fails, we will scan again. We assume the failure is
|
|
|
+ // temporary.
|
|
|
+ modTime = newModTime;
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error while trying to scan the directory " + p, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public class HistoryFileInfo {
|
|
|
private Path historyFile;
|
|
|
private Path confFile;
|
|
@@ -352,7 +375,8 @@ public class HistoryFileManager extends AbstractService {
|
|
|
* Maintains a mapping between intermediate user directories and the last
|
|
|
* known modification time.
|
|
|
*/
|
|
|
- private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
|
|
|
+ private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap =
|
|
|
+ new ConcurrentHashMap<String, UserLogDir>();
|
|
|
|
|
|
private JobACLsManager aclsMgr;
|
|
|
|
|
@@ -584,23 +608,15 @@ public class HistoryFileManager extends AbstractService {
|
|
|
|
|
|
for (FileStatus userDir : userDirList) {
|
|
|
String name = userDir.getPath().getName();
|
|
|
- long newModificationTime = userDir.getModificationTime();
|
|
|
- boolean shouldScan = false;
|
|
|
- synchronized (userDirModificationTimeMap) {
|
|
|
- if (!userDirModificationTimeMap.containsKey(name)
|
|
|
- || newModificationTime > userDirModificationTimeMap.get(name)) {
|
|
|
- shouldScan = true;
|
|
|
- userDirModificationTimeMap.put(name, newModificationTime);
|
|
|
- }
|
|
|
- }
|
|
|
- if (shouldScan) {
|
|
|
- try {
|
|
|
- scanIntermediateDirectory(userDir.getPath());
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Error while trying to scan the directory "
|
|
|
- + userDir.getPath(), e);
|
|
|
+ UserLogDir dir = userDirModificationTimeMap.get(name);
|
|
|
+ if(dir == null) {
|
|
|
+ dir = new UserLogDir();
|
|
|
+ UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
|
|
|
+ if(old != null) {
|
|
|
+ dir = old;
|
|
|
}
|
|
|
}
|
|
|
+ dir.scanIfNeeded(userDir);
|
|
|
}
|
|
|
}
|
|
|
|