Selaa lähdekoodia

YARN 9108. Fix FederationIntercepter merge home and secondary allocate response typo. Contributed by Abhishek Modi.

Botong Huang 6 vuotta sitten
vanhempi
commit
657aa433e2

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -1405,7 +1405,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     }
   }
 
-  private void mergeAllocateResponse(AllocateResponse homeResponse,
+  @VisibleForTesting
+  protected void mergeAllocateResponse(AllocateResponse homeResponse,
       AllocateResponse otherResponse, SubClusterId otherRMAddress) {
 
     if (otherResponse.getAMRMToken() != null) {
@@ -1467,7 +1468,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
       if (par1 != null && par2 != null) {
         par1.getResourceRequest().addAll(par2.getResourceRequest());
-        par2.getContainers().addAll(par2.getContainers());
+        par1.getContainers().addAll(par2.getContainers());
       }
 
       StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();

+ 75 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +49,8 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -893,4 +897,75 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
       }
     });
   }
+
+  @Test
+  public void testMergeAllocateResponse() {
+    ContainerId cid = ContainerId.newContainerId(attemptId, 0);
+    ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
+    cStatus.setContainerId(cid);
+    Container container =
+            Container.newInstance(cid, null, null, null, null, null);
+
+
+    AllocateResponse homeResponse = Records.newRecord(AllocateResponse.class);
+    homeResponse.setAllocatedContainers(Collections.singletonList(container));
+    homeResponse.setCompletedContainersStatuses(
+        Collections.singletonList(cStatus));
+    homeResponse.setUpdatedNodes(
+            Collections.singletonList(Records.newRecord(NodeReport.class)));
+    homeResponse.setNMTokens(
+            Collections.singletonList(Records.newRecord(NMToken.class)));
+    homeResponse.setUpdatedContainers(
+            Collections.singletonList(
+                Records.newRecord(UpdatedContainer.class)));
+    homeResponse.setUpdateErrors(Collections
+            .singletonList(Records.newRecord(UpdateContainerError.class)));
+    homeResponse.setAvailableResources(Records.newRecord(Resource.class));
+    homeResponse.setPreemptionMessage(createDummyPreemptionMessage(
+        ContainerId.newContainerId(attemptId, 0)));
+
+    AllocateResponse response = Records.newRecord(AllocateResponse.class);
+    response.setAllocatedContainers(Collections.singletonList(container));
+    response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
+    response.setUpdatedNodes(
+            Collections.singletonList(Records.newRecord(NodeReport.class)));
+    response.setNMTokens(
+            Collections.singletonList(Records.newRecord(NMToken.class)));
+    response.setUpdatedContainers(
+            Collections.singletonList(
+                Records.newRecord(UpdatedContainer.class)));
+    response.setUpdateErrors(Collections
+            .singletonList(Records.newRecord(UpdateContainerError.class)));
+    response.setAvailableResources(Records.newRecord(Resource.class));
+    response.setPreemptionMessage(createDummyPreemptionMessage(
+        ContainerId.newContainerId(attemptId, 1)));
+
+    interceptor.mergeAllocateResponse(homeResponse,
+        response, SubClusterId.newInstance("SC-1"));
+
+    Assert.assertEquals(2,
+        homeResponse.getPreemptionMessage().getContract()
+            .getContainers().size());
+    Assert.assertEquals(2,
+        homeResponse.getAllocatedContainers().size());
+    Assert.assertEquals(2,
+        homeResponse.getUpdatedNodes().size());
+    Assert.assertEquals(2,
+        homeResponse.getCompletedContainersStatuses().size());
+  }
+
+  private PreemptionMessage createDummyPreemptionMessage(
+      ContainerId containerId) {
+    PreemptionMessage preemptionMessage = Records.newRecord(
+        PreemptionMessage.class);
+    PreemptionContainer container = Records.newRecord(
+        PreemptionContainer.class);
+    container.setId(containerId);
+    Set<PreemptionContainer> preemptionContainers = new HashSet<>();
+    preemptionContainers.add(container);
+    PreemptionContract contract = Records.newRecord(PreemptionContract.class);
+    contract.setContainers(preemptionContainers);
+    preemptionMessage.setContract(contract);
+    return preemptionMessage;
+  }
 }