소스 검색

YARN-11436. [Federation] MemoryFederationStateStore Support Version. (#5518)

slfan1989 2 년 전
부모
커밋
69b90b5698

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateVersionIncompatibleException.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class FederationStateVersionIncompatibleException extends YarnException {
+
+  private static final long serialVersionUID = 1L;
+
+  public FederationStateVersionIncompatibleException(Throwable cause) {
+    super(cause);
+  }
+
+  public FederationStateVersionIncompatibleException(String message) {
+    super(message);
+  }
+
+  public FederationStateVersionIncompatibleException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

+ 32 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

@@ -31,17 +31,18 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.Comparator;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateVersionIncompatibleException;
 import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
 import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +118,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   private int maxAppsInStateStore;
   private AtomicInteger sequenceNum;
   private AtomicInteger masterKeyId;
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 1);
+  private byte[] version;
 
   private final MonotonicClock clock = new MonotonicClock();
 
@@ -134,6 +139,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
         YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
     sequenceNum = new AtomicInteger();
     masterKeyId = new AtomicInteger();
+    version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
   }
 
   @Override
@@ -367,22 +373,43 @@ public class MemoryFederationStateStore implements FederationStateStore {
 
   @Override
   public Version getCurrentVersion() {
-    throw new NotImplementedException("Code is not implemented");
+    return CURRENT_VERSION_INFO;
   }
 
   @Override
   public Version loadVersion() throws Exception {
-    throw new NotImplementedException("Code is not implemented");
+    if (version != null) {
+      VersionProto versionProto = VersionProto.parseFrom(version);
+      return new VersionPBImpl(versionProto);
+    }
+    return null;
   }
 
   @Override
   public void storeVersion() throws Exception {
-    throw new NotImplementedException("Code is not implemented");
+    version = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
   }
 
   @Override
   public void checkVersion() throws Exception {
-    throw new NotImplementedException("Code is not implemented");
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded Router State Version Info = {}.", loadedVersion);
+    Version currentVersion = getCurrentVersion();
+    if (loadedVersion != null && loadedVersion.equals(currentVersion)) {
+      return;
+    }
+    // if there is no version info, treat it as CURRENT_VERSION_INFO;
+    if (loadedVersion == null) {
+      loadedVersion = currentVersion;
+    }
+    if (loadedVersion.isCompatibleTo(currentVersion)) {
+      LOG.info("Storing Router State Version Info {}.", currentVersion);
+      storeVersion();
+    } else {
+      throw new FederationStateVersionIncompatibleException(
+          "Expecting Router state version " + currentVersion +
+          ", but loading version " + loadedVersion);
+    }
   }
 
   @Override

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -88,4 +90,39 @@ public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest
     assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier);
     assertEquals(identifier, tokenIdentifier);
   }
+
+  @Test
+  public void testGetCurrentVersion() {
+    MemoryFederationStateStore memoryStateStore =
+        MemoryFederationStateStore.class.cast(this.getStateStore());
+    Version version = memoryStateStore.getCurrentVersion();
+    assertEquals(version.getMajorVersion(), 1);
+    assertEquals(version.getMinorVersion(), 1);
+  }
+
+  @Test
+  public void testStoreVersion() throws Exception {
+    MemoryFederationStateStore memoryStateStore =
+        MemoryFederationStateStore.class.cast(this.getStateStore());
+    memoryStateStore.storeVersion();
+    Version version = memoryStateStore.getCurrentVersion();
+    assertEquals(version.getMajorVersion(), 1);
+    assertEquals(version.getMinorVersion(), 1);
+  }
+
+  @Test
+  public void testLoadVersion() throws Exception {
+    MemoryFederationStateStore memoryStateStore =
+        MemoryFederationStateStore.class.cast(this.getStateStore());
+    Version version = memoryStateStore.loadVersion();
+    assertEquals(version.getMajorVersion(), 1);
+    assertEquals(version.getMinorVersion(), 1);
+  }
+
+  @Test
+  public void testCheckVersion() throws Exception {
+    MemoryFederationStateStore memoryStateStore =
+        MemoryFederationStateStore.class.cast(this.getStateStore());
+    memoryStateStore.checkVersion();
+  }
 }