Explorar o código

YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru).

Subru Krishnan %!s(int64=7) %!d(string=hai) anos
pai
achega
ca669f9f8b

+ 57 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -540,30 +540,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
       }
     }
     }
 
 
-    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
-        request.getResourceBlacklistRequest().getBlacklistAdditions())) {
-      for (String resourceName : request.getResourceBlacklistRequest()
-          .getBlacklistAdditions()) {
-        SubClusterId subClusterId = getSubClusterForNode(resourceName);
-        if (subClusterId != null) {
-          AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-              subClusterId, request, requestMap);
-          newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
-              .add(resourceName);
+    if (request.getResourceBlacklistRequest() != null) {
+      if (!isNullOrEmpty(
+          request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+        for (String resourceName : request.getResourceBlacklistRequest()
+            .getBlacklistAdditions()) {
+          SubClusterId subClusterId = getSubClusterForNode(resourceName);
+          if (subClusterId != null) {
+            AllocateRequest newRequest =
+                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+                    requestMap);
+            newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+                .add(resourceName);
+          }
         }
         }
       }
       }
-    }
-
-    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
-        request.getResourceBlacklistRequest().getBlacklistRemovals())) {
-      for (String resourceName : request.getResourceBlacklistRequest()
-          .getBlacklistRemovals()) {
-        SubClusterId subClusterId = getSubClusterForNode(resourceName);
-        if (subClusterId != null) {
-          AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-              subClusterId, request, requestMap);
-          newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
-              .add(resourceName);
+      if (!isNullOrEmpty(
+          request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+        for (String resourceName : request.getResourceBlacklistRequest()
+            .getBlacklistRemovals()) {
+          SubClusterId subClusterId = getSubClusterForNode(resourceName);
+          if (subClusterId != null) {
+            AllocateRequest newRequest =
+                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+                    requestMap);
+            newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+                .add(resourceName);
+          }
         }
         }
       }
       }
     }
     }
@@ -896,13 +899,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
       }
     }
     }
 
 
-    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
-      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
-        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
-      } else {
-        homeResponse.setNMTokens(otherResponse.getNMTokens());
-      }
-    }
+    homeResponse.setNumClusterNodes(
+        homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
 
 
     PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
     PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
     PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
     PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
@@ -935,6 +933,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         spar1.getContainers().addAll(spar2.getContainers());
         spar1.getContainers().addAll(spar2.getContainers());
       }
       }
     }
     }
+
+    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
+      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
+        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
+      } else {
+        homeResponse.setNMTokens(otherResponse.getNMTokens());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) {
+      if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) {
+        homeResponse.getUpdatedContainers()
+            .addAll(otherResponse.getUpdatedContainers());
+      } else {
+        homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdateErrors())) {
+      if (!isNullOrEmpty(homeResponse.getUpdateErrors())) {
+        homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
+      } else {
+        homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
+      }
+    }
   }
   }
 
 
   /**
   /**
@@ -1052,6 +1075,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     return this.uamPool.getAllUAMIds().size();
     return this.uamPool.getAllUAMIds().size();
   }
   }
 
 
+  @VisibleForTesting
+  public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
+    return this.asyncResponseSink;
+  }
+
   /**
   /**
    * Private structure for encapsulating SubClusterId and
    * Private structure for encapsulating SubClusterId and
    * RegisterApplicationMasterResponse instances.
    * RegisterApplicationMasterResponse instances.

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -36,8 +38,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -493,4 +502,49 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     } catch (YarnException e) {
     } catch (YarnException e) {
     }
     }
   }
   }
+
+  @Test
+  public void testAllocateResponse() throws Exception {
+    interceptor.registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null));
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+
+    Map<SubClusterId, List<AllocateResponse>> asyncResponseSink =
+        interceptor.getAsyncResponseSink();
+
+    ContainerId cid = ContainerId.newContainerId(attemptId, 0);
+    ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
+    cStatus.setContainerId(cid);
+    Container container =
+        Container.newInstance(cid, null, null, null, null, null);
+
+    AllocateResponse response = Records.newRecord(AllocateResponse.class);
+    response.setAllocatedContainers(Collections.singletonList(container));
+    response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
+    response.setUpdatedNodes(
+        Collections.singletonList(Records.newRecord(NodeReport.class)));
+    response.setNMTokens(
+        Collections.singletonList(Records.newRecord(NMToken.class)));
+    response.setUpdatedContainers(
+        Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
+    response.setUpdateErrors(Collections
+        .singletonList(Records.newRecord(UpdateContainerError.class)));
+    response.setAvailableResources(Records.newRecord(Resource.class));
+    response.setPreemptionMessage(Records.newRecord(PreemptionMessage.class));
+
+    List<AllocateResponse> list = new ArrayList<>();
+    list.add(response);
+    asyncResponseSink.put(SubClusterId.newInstance("SC-1"), list);
+
+    response = interceptor.allocate(allocateRequest);
+
+    Assert.assertEquals(1, response.getAllocatedContainers().size());
+    Assert.assertNotNull(response.getAvailableResources());
+    Assert.assertEquals(1, response.getCompletedContainersStatuses().size());
+    Assert.assertEquals(1, response.getUpdatedNodes().size());
+    Assert.assertNotNull(response.getPreemptionMessage());
+    Assert.assertEquals(1, response.getNMTokens().size());
+    Assert.assertEquals(1, response.getUpdatedContainers().size());
+    Assert.assertEquals(1, response.getUpdateErrors().size());
+  }
 }
 }