|
@@ -28,6 +28,7 @@ import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -36,7 +37,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
|
|
|
import javax.crypto.SecretKey;
|
|
import javax.crypto.SecretKey;
|
|
|
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -692,15 +692,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
|
|
|
// Mark every containerStatus as being sent to AM though we may return
|
|
// Mark every containerStatus as being sent to AM though we may return
|
|
// only the ones that belong to the current attempt
|
|
// only the ones that belong to the current attempt
|
|
- boolean keepContainersAcressAttempts = this.submissionContext
|
|
|
|
|
|
+ boolean keepContainersAcrossAppAttempts = this.submissionContext
|
|
.getKeepContainersAcrossApplicationAttempts();
|
|
.getKeepContainersAcrossApplicationAttempts();
|
|
- for (NodeId nodeId:justFinishedContainers.keySet()) {
|
|
|
|
-
|
|
|
|
- // Clear and get current values
|
|
|
|
- List<ContainerStatus> finishedContainers = justFinishedContainers.put
|
|
|
|
- (nodeId, new ArrayList<ContainerStatus>());
|
|
|
|
|
|
+ for (Map.Entry<NodeId, List<ContainerStatus>> entry:
|
|
|
|
+ justFinishedContainers.entrySet()) {
|
|
|
|
+ NodeId nodeId = entry.getKey();
|
|
|
|
+ List<ContainerStatus> finishedContainers = entry.getValue();
|
|
|
|
+ if (finishedContainers.isEmpty()) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
|
|
- if (keepContainersAcressAttempts) {
|
|
|
|
|
|
+ if (keepContainersAcrossAppAttempts) {
|
|
returnList.addAll(finishedContainers);
|
|
returnList.addAll(finishedContainers);
|
|
} else {
|
|
} else {
|
|
// Filter out containers from previous attempt
|
|
// Filter out containers from previous attempt
|
|
@@ -712,12 +714,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (!finishedContainers.isEmpty()) {
|
|
|
|
- finishedContainersSentToAM.putIfAbsent(nodeId,
|
|
|
|
- new ArrayList<ContainerStatus>());
|
|
|
|
- finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
|
|
|
|
- }
|
|
|
|
|
|
+ finishedContainersSentToAM.putIfAbsent(nodeId,
|
|
|
|
+ new ArrayList<ContainerStatus>());
|
|
|
|
+ finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
|
|
}
|
|
}
|
|
|
|
+ justFinishedContainers.clear();
|
|
|
|
|
|
return returnList;
|
|
return returnList;
|
|
} finally {
|
|
} finally {
|