Browse Source

YARN-7835. Race condition in NM while publishing events if second attempt is launched on the same node. (Rohith Sharma K S via Haibo Chen)

Haibo Chen 7 years ago
parent
commit
d1274c3b71

+ 43 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java

@@ -19,7 +19,12 @@
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -31,6 +36,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -59,6 +65,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
   private final NodeTimelineCollectorManager collectorManager;
   private long collectorLingerPeriod;
   private ScheduledExecutorService scheduler;
+  private Map<ApplicationId, Set<ContainerId>> appIdToContainerId =
+      new ConcurrentHashMap<>();
 
   public PerNodeTimelineCollectorsAuxService() {
     this(new NodeTimelineCollectorManager(true));
@@ -148,7 +156,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
     if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
       ApplicationId appId = context.getContainerId().
           getApplicationAttemptId().getApplicationId();
-      addApplication(appId, context.getUser());
+      synchronized (appIdToContainerId) {
+        Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
+        if (masterContainers == null) {
+          masterContainers = new HashSet<>();
+          appIdToContainerId.put(appId, masterContainers);
+        }
+        masterContainers.add(context.getContainerId());
+        addApplication(appId, context.getUser());
+      }
     }
   }
 
@@ -162,16 +178,35 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
     // intercept the event of the AM container being stopped and remove the app
     // level collector service
     if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
-      final ApplicationId appId =
-          context.getContainerId().getApplicationAttemptId().getApplicationId();
-      scheduler.schedule(new Runnable() {
-        public void run() {
-          removeApplication(appId);
-        }
-      }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
+      final ContainerId containerId = context.getContainerId();
+      removeApplicationCollector(containerId);
     }
   }
 
+  @VisibleForTesting
+  protected Future removeApplicationCollector(final ContainerId containerId) {
+    final ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+    return scheduler.schedule(new Runnable() {
+      public void run() {
+        synchronized (appIdToContainerId) {
+          Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
+          if (masterContainers == null) {
+            LOG.info("Stop container for " + containerId
+                + " is called before initializing container.");
+            return;
+          }
+          masterContainers.remove(containerId);
+          if (masterContainers.size() == 0) {
+            // remove only if it is last master container
+            removeApplication(appId);
+            appIdToContainerId.remove(appId);
+          }
+        }
+      }
+    }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
+  }
+
   @VisibleForTesting
   boolean hasApplication(ApplicationId appId) {
     return collectorManager.containsTimelineCollector(appId);

+ 77 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java

@@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ExitUtil;
@@ -47,16 +48,17 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestPerNodeTimelineCollectorsAuxService {
   private ApplicationAttemptId appAttemptId;
   private PerNodeTimelineCollectorsAuxService auxService;
   private Configuration conf;
+  private ApplicationId appId;
 
   public TestPerNodeTimelineCollectorsAuxService() {
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
     appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     conf = new YarnConfiguration();
     // enable timeline service v.2
@@ -107,15 +109,6 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerType()).thenReturn(
         ContainerType.APPLICATION_MASTER);
     auxService.stopContainer(context);
-    // auxService should have the app's collector and need to remove only after
-    // a configured period
-    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
-    for (int i = 0; i < 4; i++) {
-      Thread.sleep(500L);
-      if (!auxService.hasApplication(appAttemptId.getApplicationId())) {
-        break;
-      }
-    }
 
     // auxService should not have that app
     assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
@@ -155,21 +148,53 @@ public class TestPerNodeTimelineCollectorsAuxService {
   private PerNodeTimelineCollectorsAuxService
       createCollectorAndAddApplication() {
     PerNodeTimelineCollectorsAuxService service = createCollector();
+
+    ContainerInitializationContext context =
+        createContainerInitalizationContext(1);
+    service.initializeContainer(context);
+    return service;
+  }
+
+  ContainerInitializationContext createContainerInitalizationContext(
+      int attempt) {
+    appAttemptId = ApplicationAttemptId.newInstance(appId, attempt);
     // create an AM container
     ContainerId containerId = getAMContainerId();
     ContainerInitializationContext context =
         mock(ContainerInitializationContext.class);
     when(context.getContainerId()).thenReturn(containerId);
-    when(context.getContainerType()).thenReturn(
-        ContainerType.APPLICATION_MASTER);
-    service.initializeContainer(context);
-    return service;
+    when(context.getContainerType())
+        .thenReturn(ContainerType.APPLICATION_MASTER);
+    return context;
+  }
+
+  ContainerTerminationContext createContainerTerminationContext(int attempt) {
+    appAttemptId = ApplicationAttemptId.newInstance(appId, attempt);
+    // create an AM container
+    ContainerId containerId = getAMContainerId();
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    when(context.getContainerType())
+        .thenReturn(ContainerType.APPLICATION_MASTER);
+    return context;
   }
 
   private PerNodeTimelineCollectorsAuxService createCollector() {
     NodeTimelineCollectorManager collectorManager = createCollectorManager();
     PerNodeTimelineCollectorsAuxService service =
-        spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
+        spy(new PerNodeTimelineCollectorsAuxService(collectorManager) {
+          @Override
+          protected Future removeApplicationCollector(ContainerId containerId) {
+            Future future = super.removeApplicationCollector(containerId);
+            try {
+              future.get();
+            } catch (Exception e) {
+              Assert.fail("Expeption thrown while removing collector");
+            }
+            return future;
+          }
+        });
     service.init(conf);
     service.start();
     return service;
@@ -200,4 +225,40 @@ public class TestPerNodeTimelineCollectorsAuxService {
   private ContainerId getContainerId(long id) {
     return ContainerId.newContainerId(appAttemptId, id);
   }
+
+  @Test(timeout = 60000)
+  public void testRemoveAppWhenSecondAttemptAMCotainerIsLaunchedSameNode()
+      throws Exception {
+    // add first attempt collector
+    auxService = createCollectorAndAddApplication();
+    // auxService should have a single app
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
+
+    // add second attempt collector before first attempt master container stop
+    ContainerInitializationContext containerInitalizationContext =
+        createContainerInitalizationContext(2);
+    auxService.initializeContainer(containerInitalizationContext);
+
+    assertTrue("Applicatin not found in collectors.",
+        auxService.hasApplication(appAttemptId.getApplicationId()));
+
+    // first attempt stop container
+    ContainerTerminationContext context = createContainerTerminationContext(1);
+    auxService.stopContainer(context);
+
+    // 2nd attempt container removed, still collector should hold application id
+    assertTrue("collector has removed application though 2nd attempt"
+            + " is running this node",
+        auxService.hasApplication(appAttemptId.getApplicationId()));
+
+    // second attempt stop container
+    context = createContainerTerminationContext(2);
+    auxService.stopContainer(context);
+
+    // auxService should not have that app
+    assertFalse("Application is not removed from collector",
+        auxService.hasApplication(appAttemptId.getApplicationId()));
+    auxService.close();
+  }
+
 }