Explorar el Código

YARN-10760. Number of allocated OPPORTUNISTIC containers can dip below 0 (#3642)

Andrew Chung hace 3 años
padre
commit
d3f0b7eab7

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/OpportunisticSchedulerMetrics.java

@@ -63,6 +63,18 @@ public class OpportunisticSchedulerMetrics {
     return INSTANCE;
   }
 
+  @VisibleForTesting
+  public static void resetMetrics() {
+    synchronized (OpportunisticSchedulerMetrics.class) {
+      isInitialized.set(false);
+      INSTANCE = null;
+      MetricsSystem ms = DefaultMetricsSystem.instance();
+      if (ms != null) {
+        ms.unregisterSource("OpportunisticSchedulerMetrics");
+      }
+    }
+  }
+
   private static void registerMetrics() {
     registry = new MetricsRegistry(RECORD_INFO);
     registry.tag(RECORD_INFO, "ResourceManager");

+ 13 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -584,8 +584,14 @@ public abstract class AbstractYarnScheduler
             rmContainer);
 
         // recover scheduler attempt
-        schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
+        final boolean recovered = schedulerAttempt.recoverContainer(
+            schedulerNode, rmContainer);
 
+        if (recovered && rmContainer.getExecutionType() ==
+            ExecutionType.OPPORTUNISTIC) {
+          OpportunisticSchedulerMetrics.getMetrics()
+              .incrAllocatedOppContainers(1);
+        }
         // set master container for the current running AMContainer for this
         // attempt.
         RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
@@ -720,7 +726,10 @@ public abstract class AbstractYarnScheduler
       SchedulerApplicationAttempt schedulerAttempt =
           getCurrentAttemptForContainer(containerId);
       if (schedulerAttempt != null) {
-        schedulerAttempt.removeRMContainer(containerId);
+        if (schedulerAttempt.removeRMContainer(containerId)) {
+          OpportunisticSchedulerMetrics.getMetrics()
+              .incrReleasedOppContainers(1);
+        }
       }
       LOG.debug("Completed container: {} in state: {} event:{}",
           rmContainer.getContainerId(), rmContainer.getState(), event);
@@ -729,7 +738,6 @@ public abstract class AbstractYarnScheduler
       if (node != null) {
         node.releaseContainer(rmContainer.getContainerId(), false);
       }
-      OpportunisticSchedulerMetrics.getMetrics().incrReleasedOppContainers(1);
     }
 
     // If the container is getting killed in ACQUIRED state, the requester (AM
@@ -1411,6 +1419,8 @@ public abstract class AbstractYarnScheduler
             RMContainer demotedRMContainer =
                 createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
             if (demotedRMContainer != null) {
+              OpportunisticSchedulerMetrics.getMetrics()
+                  .incrAllocatedOppContainers(1);
               appAttempt.addToNewlyDemotedContainers(
                       uReq.getContainerId(), demotedRMContainer);
             }

+ 15 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -402,7 +402,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     }
   }
 
-  public void removeRMContainer(ContainerId containerId) {
+  /**
+   * Removes an RM container from the map of live containers
+   * related to this application attempt.
+   * @param containerId The container ID of the RMContainer to remove
+   * @return true if the container is in the map
+   */
+  public boolean removeRMContainer(ContainerId containerId) {
     writeLock.lock();
     try {
       RMContainer rmContainer = liveContainers.remove(containerId);
@@ -415,7 +421,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
           this.attemptResourceUsageAllocatedRemotely
               .decUsed(rmContainer.getAllocatedResource());
         }
+
+        return true;
       }
+
+      return false;
     } finally {
       writeLock.unlock();
     }
@@ -1226,7 +1236,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     }
   }
 
-  public void recoverContainer(SchedulerNode node,
+  public boolean recoverContainer(SchedulerNode node,
       RMContainer rmContainer) {
     writeLock.lock();
     try {
@@ -1234,7 +1244,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       appSchedulingInfo.recoverContainer(rmContainer, node.getPartition());
 
       if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
-        return;
+        return false;
       }
       LOG.info("SchedulerAttempt " + getApplicationAttemptId()
           + " is recovering container " + rmContainer.getContainerId());
@@ -1244,6 +1254,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
             rmContainer.getContainer().getResource());
       }
 
+      return true;
+
       // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
       // is called.
       // newlyAllocatedContainers.add(rmContainer);

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -666,11 +666,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   @Override
-  public synchronized void recoverContainer(SchedulerNode node,
+  public synchronized boolean recoverContainer(SchedulerNode node,
       RMContainer rmContainer) {
     writeLock.lock();
     try {
-      super.recoverContainer(node, rmContainer);
+      final boolean recovered = super.recoverContainer(node, rmContainer);
 
       if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
         getQueue().incUsedResource(rmContainer.getContainer().getResource());
@@ -685,6 +685,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         getQueue().addAMResourceUsage(resource);
         setAmRunning(true);
       }
+
+      return recovered;
     } finally {
       writeLock.unlock();
     }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -534,6 +534,17 @@ public class MockRM extends ResourceManager {
     return nm;
   }
 
+  public MockNM registerNode(String nodeIdStr, int memory, int vCores,
+      List<ApplicationId> runningApplications,
+      List<NMContainerStatus> containerStatuses) throws Exception {
+    MockNM nm =
+        new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
+            YarnVersionInfo.getVersion());
+    nm.registerNode(containerStatuses, runningApplications);
+    drainEventsImplicitly();
+    return nm;
+  }
+
   public MockNM registerNode(String nodeIdStr, Resource nodeCapability)
       throws Exception {
     MockNM nm = new MockNM(nodeIdStr, nodeCapability,

+ 134 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSche
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -105,6 +107,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
@@ -132,6 +135,11 @@ public class TestOpportunisticContainerAllocatorAMService {
         YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
     conf.setInt(
         YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    conf.set(
+        YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
     startRM(conf);
   }
 
@@ -165,6 +173,8 @@ public class TestOpportunisticContainerAllocatorAMService {
     if (rm != null) {
       rm.stop();
     }
+
+    OpportunisticSchedulerMetrics.resetMetrics();
   }
 
   @Test(timeout = 600000)
@@ -817,6 +827,130 @@ public class TestOpportunisticContainerAllocatorAMService {
         metrics.getAggregatedReleasedContainers());
   }
 
+  /**
+   * Tests that, if a node has running opportunistic containers when the RM
+   * is down, RM is able to reflect the opportunistic containers
+   * in its metrics upon RM recovery.
+   */
+  @Test
+  public void testMetricsRetainsAllocatedOpportunisticAfterRMRestart()
+      throws Exception {
+    final MockRMAppSubmissionData appSubmissionData =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build();
+
+    MockNM nm1 = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    final RMApp app = MockRMAppSubmitter.submit(rm, appSubmissionData);
+
+    final ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+
+    MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    final OpportunisticSchedulerMetrics metrics =
+        OpportunisticSchedulerMetrics.getMetrics();
+
+    // We start with ID 2, since AMContainer is ID 1
+    final ContainerId recoverOContainerId2 = ContainerId.newContainerId(
+        appAttemptId, 2);
+
+    final Resource fakeResource = Resource.newInstance(1024, 1);
+    final String fakeDiagnostics = "recover container";
+    final Priority fakePriority = Priority.newInstance(0);
+
+    final NMContainerStatus recoverOContainerReport1 =
+        NMContainerStatus.newInstance(
+            recoverOContainerId2, 0, ContainerState.RUNNING,
+            fakeResource, fakeDiagnostics, 0,
+            fakePriority, 0, null,
+            ExecutionType.OPPORTUNISTIC, -1);
+
+    // Make sure that numbers start with 0
+    Assert.assertEquals(0, metrics.getAllocatedContainers());
+
+    // Recover one OContainer only
+    rm.registerNode("h2:1234", 4096, 1,
+        Collections.singletonList(
+            appAttemptId.getApplicationId()),
+        Collections.singletonList(recoverOContainerReport1));
+
+    Assert.assertEquals(1, metrics.getAllocatedContainers());
+
+    // Recover two OContainers at once
+    final ContainerId recoverOContainerId3 = ContainerId.newContainerId(
+        appAttemptId, 3);
+
+    final ContainerId recoverOContainerId4 = ContainerId.newContainerId(
+        appAttemptId, 4);
+
+    final NMContainerStatus recoverOContainerReport2 =
+        NMContainerStatus.newInstance(
+            recoverOContainerId2, 0, ContainerState.RUNNING,
+            fakeResource, fakeDiagnostics, 0,
+            fakePriority, 0, null,
+            ExecutionType.OPPORTUNISTIC, -1);
+
+    final NMContainerStatus recoverOContainerReport3 =
+        NMContainerStatus.newInstance(
+            recoverOContainerId3, 0, ContainerState.RUNNING,
+            fakeResource, fakeDiagnostics, 0,
+            fakePriority, 0, null,
+            ExecutionType.OPPORTUNISTIC, -1);
+
+    rm.registerNode(
+        "h3:1234", 4096, 10,
+        Collections.singletonList(
+            appAttemptId.getApplicationId()),
+        Arrays.asList(recoverOContainerReport2, recoverOContainerReport3));
+
+    Assert.assertEquals(3, metrics.getAllocatedContainers());
+
+    // Make sure that the recovered GContainer
+    // does not increment OContainer count
+    final ContainerId recoverGContainerId = ContainerId.newContainerId(
+        appAttemptId, 5);
+
+    final NMContainerStatus recoverGContainerReport =
+        NMContainerStatus.newInstance(
+            recoverGContainerId, 0, ContainerState.RUNNING,
+            fakeResource, fakeDiagnostics, 0,
+            fakePriority, 0, null,
+            ExecutionType.GUARANTEED, -1);
+
+    rm.registerNode(
+        "h4:1234", 4096, 10,
+        Collections.singletonList(
+            appAttemptId.getApplicationId()),
+        Collections.singletonList(recoverGContainerReport));
+
+    Assert.assertEquals(3, metrics.getAllocatedContainers());
+
+    final ContainerId completedOContainerId = ContainerId.newContainerId(
+        appAttemptId, 6);
+
+    final NMContainerStatus completedOContainerReport =
+        NMContainerStatus.newInstance(
+            completedOContainerId, 0, ContainerState.COMPLETE,
+            fakeResource, fakeDiagnostics, 0,
+            fakePriority, 0, null,
+            ExecutionType.OPPORTUNISTIC, -1);
+
+    // Tests that completed containers are not recorded
+    rm.registerNode(
+        "h5:1234", 4096, 10,
+        Collections.singletonList(
+            appAttemptId.getApplicationId()),
+        Collections.singletonList(completedOContainerReport));
+
+    Assert.assertEquals(3, metrics.getAllocatedContainers());
+  }
+
   @Test(timeout = 60000)
   public void testAMCrashDuringAllocate() throws Exception {
     MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());