Browse Source

YARN-2001. Added a time threshold for RM to wait before starting container allocations after restart/failover. Contributed by Jian He.

(cherry picked from commit 485c96e3cb9b0b05d6e490b4773506da83ebc61d)
Vinod Kumar Vavilapalli 10 năm trước cách đây
mục cha
commit
d24ef142b7
10 tập tin đã thay đổi với 145 bổ sung3 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  5. 41 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  6. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  7. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  8. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  9. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  10. 56 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -205,6 +205,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into
     DistributedShell. (xgong)
 
+    YARN-2001. Added a time threshold for RM to wait before starting container
+    allocations after restart/failover. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -353,6 +353,11 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
       false;
 
+  public static final String RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
+      RM_PREFIX + "work-preserving-recovery.scheduling-wait-ms";
+  public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
+      10000;
+
   /** Zookeeper interaction configs */
   public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
 

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -297,6 +297,16 @@
     <value>false</value>
   </property>
 
+  <property>
+    <description>Set the amount of time RM waits before allocating new
+    containers on work-preserving-recovery. Such wait period gives RM a chance
+    to settle down resyncing with NMs in the cluster on recovery, before assigning
+    new containers to applications.
+    </description>
+    <name>yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms</name>
+    <value>10000</value>
+  </property>
+
   <property>
     <description>The class to use as the persistent store.
 

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -108,4 +108,6 @@ public interface RMContext {
   boolean isWorkPreservingRecoveryEnabled();
   
   long getEpoch();
-}
+
+  boolean isSchedulerReadyForAllocatingContainers();
+}

+ 41 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
@@ -44,6 +47,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -85,6 +90,13 @@ public class RMContextImpl implements RMContext {
   private SystemMetricsPublisher systemMetricsPublisher;
   private ConfigurationProvider configurationProvider;
   private long epoch;
+  private Clock systemClock = new SystemClock();
+  private long schedulerRecoveryStartTime = 0;
+  private long schedulerRecoveryWaitTime = 0;
+  private boolean printLog = true;
+  private boolean isSchedulerReady = false;
+
+  private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
 
   /**
    * Default constructor. To be used in conjunction with setter methods for
@@ -379,7 +391,34 @@ public class RMContextImpl implements RMContext {
     return this.epoch;
   }
 
- void setEpoch(long epoch) {
+  void setEpoch(long epoch) {
     this.epoch = epoch;
   }
-}
+
+  public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
+    this.schedulerRecoveryStartTime = systemClock.getTime();
+    this.schedulerRecoveryWaitTime = waitTime;
+  }
+
+  public boolean isSchedulerReadyForAllocatingContainers() {
+    if (isSchedulerReady) {
+      return isSchedulerReady;
+    }
+    isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
+        > schedulerRecoveryWaitTime;
+    if (!isSchedulerReady && printLog) {
+      LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
+      printLog = false;
+    }
+    if (isSchedulerReady) {
+      LOG.info("Scheduler recovery is done. Start allocating new containers.");
+    }
+    return isSchedulerReady;
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setSystemClock(Clock clock) {
+    this.systemClock = clock;
+  }
+}

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -1131,6 +1131,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
     // recover applications
     rmAppManager.recover(state);
+
+    setSchedulerRecoveryStartAndWaitTime(state, conf);
   }
 
   public static void main(String argv[]) {
@@ -1178,6 +1180,16 @@ public class ResourceManager extends CompositeService implements Recoverable {
     rmContext.setDispatcher(rmDispatcher);
   }
 
+  private void setSchedulerRecoveryStartAndWaitTime(RMState state,
+      Configuration conf) {
+    if (!state.getApplicationState().isEmpty()) {
+      long waitTime =
+          conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+            YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
+      rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
+    }
+  }
+
   /**
    * Retrieve RM bind address from configuration
    * 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -902,6 +902,10 @@ public class CapacityScheduler extends
   }
 
   private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+    if (rmContext.isWorkPreservingRecoveryEnabled()
+        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+      return;
+    }
 
     // Assign new containers...
     // 1. Check for reserved applications

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1015,6 +1015,11 @@ public class FairScheduler extends
   }
   
   private synchronized void attemptScheduling(FSSchedulerNode node) {
+    if (rmContext.isWorkPreservingRecoveryEnabled()
+        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+      return;
+    }
+
     // Assign new containers...
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -702,6 +702,12 @@ public class FifoScheduler extends
           completedContainer, RMContainerEventType.FINISHED);
     }
 
+
+    if (rmContext.isWorkPreservingRecoveryEnabled()
+        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+      return;
+    }
+
     if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
             node.getAvailableResource(),minimumAllocation)) {
       LOG.debug("Node heartbeat " + rmNode.getNodeID() + 

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -37,10 +37,12 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -62,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -479,6 +483,7 @@ public class TestWorkPreservingRMRestart {
   @Test(timeout = 20000)
   public void testAMfailedBetweenRMRestart() throws Exception {
     MemoryRMStateStore memStore = new MemoryRMStateStore();
+    conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
     memStore.init(conf);
     rm1 = new MockRM(conf, memStore);
     rm1.start();
@@ -762,4 +767,55 @@ public class TestWorkPreservingRMRestart {
       Thread.sleep(200);
     }
   }
+
+  @Test (timeout = 20000)
+  public void testNewContainersNotAllocatedDuringSchedulerRecovery()
+      throws Exception {
+    conf.setLong(
+      YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // Restart RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode();
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    long startTime = System.currentTimeMillis();
+    ((RMContextImpl)rm2.getRMContext()).setSystemClock(clock);
+    am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am1.registerAppAttempt(true);
+    rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    // AM request for new containers
+    am1.allocate("127.0.0.1", 1000, 1, new ArrayList<ContainerId>());
+
+    List<Container> containers = new ArrayList<Container>();
+    clock.setTime(startTime + 2000);
+    nm1.nodeHeartbeat(true);
+
+    // sleep some time as allocation happens asynchronously.
+    Thread.sleep(3000);
+    containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+      new ArrayList<ContainerId>()).getAllocatedContainers());
+    // container is not allocated during scheduling recovery.
+    Assert.assertTrue(containers.isEmpty());
+
+    clock.setTime(startTime + 8000);
+    nm1.nodeHeartbeat(true);
+    // Container is created after recovery is done.
+    while (containers.isEmpty()) {
+      containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers());
+      Thread.sleep(500);
+    }
+  }
 }