Browse Source

YARN-3094. Reset timer for liveness monitors after RM recovery. Contributed by Jun Gong
(cherry picked from commit 0af6a99a3fcfa4b47d3bcba5e5cc5fe7b312a152)

(cherry picked from commit 61466809552f96a83aa19446d4d59cecd0d2cad5)
(cherry picked from commit ab654746fbad2da12b24b13425dc9bf17c46b50c)

Jian He 10 năm trước cách đây
mục cha
commit
a703952d39

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -72,6 +72,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3103. AMRMClientImpl does not update AMRM token properly. (Jason Lowe
     via jianhe)
 
+    YARN-3094. Reset timer for liveness monitors after RM recovery. (Jun Gong
+    via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java

@@ -59,6 +59,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
   @Override
   protected void serviceStart() throws Exception {
     assert !stopped : "starting when already stopped";
+    resetTimer();
     checkerThread = new Thread(new PingChecker());
     checkerThread.setName("Ping Checker");
     checkerThread.start();
@@ -99,6 +100,13 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
     running.remove(ob);
   }
 
+  public synchronized void resetTimer() {
+    long time = clock.getTime();
+    for (O ob : running.keySet()) {
+      running.put(ob, time);
+    }
+  }
+
   private class PingChecker implements Runnable {
 
     @Override

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -571,12 +571,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
       if(recoveryEnabled) {
         try {
+          LOG.info("Recovery started");
           rmStore.checkVersion();
           if (rmContext.isWorkPreservingRecoveryEnabled()) {
             rmContext.setEpoch(rmStore.getAndIncrementEpoch());
           }
           RMState state = rmStore.loadState();
           recover(state);
+          LOG.info("Recovery ended");
         } catch (Exception e) {
           // the Exception from loadState() needs to be handled for
           // HA and we need to give up master status if we got fenced

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
 public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
@@ -35,6 +36,11 @@ public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAt
     this.dispatcher = d.getEventHandler();
   }
 
+  public AMLivelinessMonitor(Dispatcher d, Clock clock) {
+    super("AMLivelinessMonitor", clock);
+    this.dispatcher = d.getEventHandler();
+  }
+
   public void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
     int expireIntvl = conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestAMLivelinessMonitor.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class TestAMLivelinessMonitor {
+
+  @Test(timeout = 10000)
+  public void testResetTimer() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000);
+    final ControlledClock clock = new ControlledClock(new SystemClock());
+    clock.setTime(0);
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      @Override
+      public synchronized RMState loadState() throws Exception {
+        clock.setTime(8000);
+        return super.loadState();
+      }
+    };
+    memStore.init(conf);
+    final ApplicationAttemptId attemptId = mock(ApplicationAttemptId.class);
+    final Dispatcher dispatcher = mock(Dispatcher.class);
+    final boolean[] expired = new boolean[]{false};
+    final AMLivelinessMonitor monitor = new AMLivelinessMonitor(
+        dispatcher, clock) {
+      @Override
+      protected void expire(ApplicationAttemptId id) {
+        Assert.assertEquals(id, attemptId);
+        expired[0] = true;
+      }
+    };
+    monitor.register(attemptId);
+    MockRM rm = new MockRM(conf, memStore) {
+      @Override
+      protected AMLivelinessMonitor createAMLivelinessMonitor() {
+        return monitor;
+      }
+    };
+    rm.start();
+    // make sure that monitor has started
+    while (monitor.getServiceState() != Service.STATE.STARTED) {
+      Thread.sleep(100);
+    }
+    // expired[0] would be set to true without resetTimer
+    Assert.assertFalse(expired[0]);
+    rm.stop();
+  }
+}