浏览代码

YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T.

Brahma Reddy Battula 4 年之前
父节点
当前提交
dfe60392c9

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java

@@ -737,7 +737,7 @@ public class ServiceScheduler extends CompositeService {
           LOG.warn(
               "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ",
               containerId, status.getExitStatus(), status.getDiagnostics());
-          return;
+          continue;
         }
         ComponentEvent event =
             new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED)

+ 88 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java

@@ -22,22 +22,29 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingCluster;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.ComponentState;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
@@ -47,7 +54,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +72,8 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class TestServiceAM extends ServiceTestUtils{
 
@@ -72,6 +83,9 @@ public class TestServiceAM extends ServiceTestUtils{
   private File basedir;
   YarnConfiguration conf = new YarnConfiguration();
   TestingCluster zkCluster;
+  @Rule
+  public ServiceTestUtils.ServiceFSWatcher rule =
+      new ServiceTestUtils.ServiceFSWatcher();
 
   @Before
   public void setup() throws Exception {
@@ -311,6 +325,80 @@ public class TestServiceAM extends ServiceTestUtils{
     am.stop();
   }
 
+  @Test
+  public void testContainerCompletedEventProcessed() throws Exception {
+    ServiceContext context = createServiceContext("abc");
+    MockServiceScheduler scheduler = new MockServiceScheduler(context);
+    scheduler.init(conf);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 0);
+    ContainerStatus containerStatus1 = ContainerStatus.newInstance(containerId1,
+        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+        "successful", 0);
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 1);
+    ContainerStatus containerStatus2 = ContainerStatus.newInstance(containerId2,
+        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
+        "successful", 0);
+    ComponentInstance instance = Mockito.mock(ComponentInstance.class);
+    Mockito.doReturn("componentInstance").when(instance).getCompName();
+    scheduler.addLiveCompInstance(containerId2, instance);
+    List<ContainerStatus> statuses = new ArrayList<>();
+    // First container instance will be null
+    statuses.add(containerStatus1);
+    // Second container instance is added
+    scheduler.addLiveCompInstance(containerId2, instance);
+    statuses.add(containerStatus2);
+    scheduler.callbackHandler.onContainersCompleted(statuses);
+    // For second container event should be dispatched.
+    verify(scheduler.dispatcher, times(1)).getEventHandler();
+    DefaultMetricsSystem.shutdown();
+  }
+
+  private ServiceContext createServiceContext(String name)
+      throws Exception {
+    Artifact artifact = new Artifact();
+    artifact.setId("1");
+    artifact.setType(Artifact.TypeEnum.TARBALL);
+    Service serviceDef = ServiceTestUtils.createExampleApplication();
+    ApplicationId applicationId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    serviceDef.setId(applicationId.toString());
+    serviceDef.setName(name);
+    serviceDef.setState(ServiceState.STARTED);
+    serviceDef.getComponents().forEach(component ->
+        component.setArtifact(artifact));
+    ServiceContext context = new MockRunningServiceContext(rule,
+        serviceDef);
+    context.scheduler.getDispatcher().setDrainEventsOnStop();
+    context.scheduler.getDispatcher().start();
+    return context;
+  }
+
+  class MockServiceScheduler extends ServiceScheduler {
+    private AsyncDispatcher dispatcher;
+    private AMRMClientCallback callbackHandler = new AMRMClientCallback();
+
+    MockServiceScheduler(ServiceContext context) {
+      super(context);
+    }
+
+    @Override
+    protected AsyncDispatcher createAsyncDispatcher() {
+      dispatcher = Mockito.mock(AsyncDispatcher.class);
+      EventHandler<Event> handler = Mockito.mock(EventHandler.class);
+      Mockito.doReturn(handler).when(dispatcher).getEventHandler();
+      return dispatcher;
+    }
+
+    @Override
+    protected AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClient() {
+      return AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
+    }
+
+  }
+
   @Test
   public void testRecordTokensForContainers() throws Exception {
     ApplicationId applicationId = ApplicationId.newInstance(123456, 1);