|
@@ -28,6 +28,7 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -137,14 +138,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
private SecretKey clientTokenMasterKey = null;
|
|
|
|
|
|
private ConcurrentMap<NodeId, List<ContainerStatus>>
|
|
|
- justFinishedContainers =
|
|
|
- new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
|
|
|
+ justFinishedContainers = new ConcurrentHashMap<>();
|
|
|
// Tracks the previous finished containers that are waiting to be
|
|
|
// verified as received by the AM. If the AM sends the next allocate
|
|
|
// request it implicitly acks this list.
|
|
|
private ConcurrentMap<NodeId, List<ContainerStatus>>
|
|
|
- finishedContainersSentToAM =
|
|
|
- new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
|
|
|
+ finishedContainersSentToAM = new ConcurrentHashMap<>();
|
|
|
private Container masterContainer;
|
|
|
|
|
|
private float progress = 0;
|
|
@@ -696,7 +695,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
public List<ContainerStatus> getJustFinishedContainers() {
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
- List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
|
|
|
+ List<ContainerStatus> returnList = new ArrayList<>();
|
|
|
for (Collection<ContainerStatus> containerStatusList :
|
|
|
justFinishedContainers.values()) {
|
|
|
returnList.addAll(containerStatusList);
|
|
@@ -735,7 +734,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
this.writeLock.lock();
|
|
|
|
|
|
try {
|
|
|
- List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
|
|
|
+ List<ContainerStatus> returnList = new ArrayList<>();
|
|
|
|
|
|
// A new allocate means the AM received the previously sent
|
|
|
// finishedContainers. We can ack this to NM now
|
|
@@ -743,15 +742,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
|
|
|
// Mark every containerStatus as being sent to AM though we may return
|
|
|
// only the ones that belong to the current attempt
|
|
|
- boolean keepContainersAcressAttempts = this.submissionContext
|
|
|
+ boolean keepContainersAcrossAppAttempts = this.submissionContext
|
|
|
.getKeepContainersAcrossApplicationAttempts();
|
|
|
- for (NodeId nodeId:justFinishedContainers.keySet()) {
|
|
|
-
|
|
|
- // Clear and get current values
|
|
|
- List<ContainerStatus> finishedContainers = justFinishedContainers.put
|
|
|
- (nodeId, new ArrayList<ContainerStatus>());
|
|
|
+ for (Map.Entry<NodeId, List<ContainerStatus>> entry :
|
|
|
+ justFinishedContainers.entrySet()) {
|
|
|
+ NodeId nodeId = entry.getKey();
|
|
|
+ List<ContainerStatus> finishedContainers = entry.getValue();
|
|
|
+ if (finishedContainers.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- if (keepContainersAcressAttempts) {
|
|
|
+ if (keepContainersAcrossAppAttempts) {
|
|
|
returnList.addAll(finishedContainers);
|
|
|
} else {
|
|
|
// Filter out containers from previous attempt
|
|
@@ -763,12 +764,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!finishedContainers.isEmpty()) {
|
|
|
- finishedContainersSentToAM.putIfAbsent(nodeId,
|
|
|
- new ArrayList<ContainerStatus>());
|
|
|
- finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
|
|
|
- }
|
|
|
+ finishedContainersSentToAM.putIfAbsent(nodeId,
|
|
|
+ new ArrayList<ContainerStatus>());
|
|
|
+ finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
|
|
|
}
|
|
|
+ justFinishedContainers.clear();
|
|
|
|
|
|
return returnList;
|
|
|
} finally {
|
|
@@ -1708,7 +1708,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|
|
finishedContainersSentToAM.put(nodeId,
|
|
|
new ArrayList<ContainerStatus>());
|
|
|
List<ContainerId> containerIdList =
|
|
|
- new ArrayList<ContainerId>(currentSentContainers.size());
|
|
|
+ new ArrayList<>(currentSentContainers.size());
|
|
|
for (ContainerStatus containerStatus : currentSentContainers) {
|
|
|
containerIdList.add(containerStatus.getContainerId());
|
|
|
}
|