Bläddra i källkod

HDFS-16844: Adds resilancy when StateStore gets exceptions. (#5138)

Allows the StateStore to stay up when there are errors reading the data.
Owen O'Malley 2 år sedan
förälder
incheckning
c71a68ca80

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java

@@ -128,9 +128,13 @@ public class MembershipNamenodeResolver
     // Our cache depends on the store, update it first
     try {
       MembershipStore membership = getMembershipStore();
-      membership.loadCache(force);
+      if (!membership.loadCache(force)) {
+        return false;
+      }
       DisabledNameserviceStore disabled = getDisabledNameserviceStore();
-      disabled.loadCache(force);
+      if (!disabled.loadCache(force)) {
+        return false;
+      }
     } catch (IOException e) {
       LOG.error("Cannot update membership from the State Store", e);
     }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java

@@ -398,7 +398,9 @@ public class MountTableResolver
     try {
       // Our cache depends on the store, update it first
       MountTableStore mountTable = this.getMountTableStore();
-      mountTable.loadCache(force);
+      if (!mountTable.loadCache(force)) {
+        return false;
+      }
 
       GetMountTableEntriesRequest request =
           GetMountTableEntriesRequest.newInstance("/");

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

@@ -100,7 +100,7 @@ public abstract class CachedRecordStore<R extends BaseRecord>
    * @throws StateStoreUnavailableException If the cache is not initialized.
    */
   private void checkCacheAvailable() throws StateStoreUnavailableException {
-    if (!this.initialized) {
+    if (!getDriver().isDriverReady() || !this.initialized) {
       throw new StateStoreUnavailableException(
           "Cached State Store not initialized, " +
           getRecordClass().getSimpleName() + " records not valid");
@@ -125,7 +125,6 @@ public abstract class CachedRecordStore<R extends BaseRecord>
       } catch (IOException e) {
         LOG.error("Cannot get \"{}\" records from the State Store",
             getRecordClass().getSimpleName());
-        this.initialized = false;
         return false;
       }
 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java

@@ -185,7 +185,9 @@ public class MembershipStoreImpl
 
   @Override
   public boolean loadCache(boolean force) throws IOException {
-    super.loadCache(force);
+    if (!super.loadCache(force)) {
+      return false;
+    }
 
     // Update local cache atomically
     cacheWriteLock.lock();

+ 139 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java

@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.records;
+
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A mock StateStoreDriver that runs in memory that can force IOExceptions
+ * upon demand.
+ */
+public class MockStateStoreDriver extends StateStoreBaseImpl {
+  private boolean giveErrors = false;
+  private boolean initialized = false;
+  private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>();
+
+  @Override
+  public boolean initDriver() {
+    initialized = true;
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean initRecordStorage(String className,
+                                                          Class<T> clazz) {
+    return true;
+  }
+
+  @Override
+  public boolean isDriverReady() {
+    return initialized;
+  }
+
+  @Override
+  public void close() throws Exception {
+    valueMap.clear();
+    initialized = false;
+  }
+
+  /**
+   * Should this object throw an IOException on each following call?
+   * @param value should we throw errors?
+   */
+  public void setGiveErrors(boolean value) {
+    giveErrors = value;
+  }
+
+  /**
+   * Check to see if this StateStore should throw IOException on each call.
+   * @throws IOException thrown if giveErrors has been set
+   */
+  private void checkErrors() throws IOException {
+    if (giveErrors) {
+      throw new IOException("Induced errors");
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
+    checkErrors();
+    Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz));
+    List<T> results =
+        map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>();
+    return new QueryResult<>(results, System.currentTimeMillis());
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean putAll(List<T> records,
+                                               boolean allowUpdate,
+                                               boolean errorIfExists)
+      throws IOException {
+    checkErrors();
+    for (T record : records) {
+      Map<String, BaseRecord> map =
+          valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()),
+              k -> new HashMap<>());
+      String key = record.getPrimaryKey();
+      BaseRecord oldRecord = map.get(key);
+      if (oldRecord == null || allowUpdate) {
+        map.put(key, record);
+      } else if (errorIfExists) {
+        throw new IOException("Record already exists for " + record.getClass()
+            + ": " + key);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
+    checkErrors();
+    return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T extends BaseRecord> int remove(Class<T> clazz,
+                                           Query<T> query)
+      throws IOException {
+    checkErrors();
+    int result = 0;
+    Map<String, BaseRecord> map =
+        valueMap.get(StateStoreUtils.getRecordName(clazz));
+    if (map != null) {
+      for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) {
+        BaseRecord record = itr.next();
+        if (query.matches((T) record)) {
+          itr.remove();
+          result += 1;
+        }
+      }
+    }
+    return result;
+  }
+}

+ 51 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java

@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,8 +20,16 @@ package org.apache.hadoop.hdfs.server.federation.store.records;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
 import org.junit.Test;
 
@@ -40,7 +48,7 @@ public class TestRouterState {
   private static final RouterServiceState STATE = RouterServiceState.RUNNING;
 
 
-  private RouterState generateRecord() throws IOException {
+  private RouterState generateRecord() {
     RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE);
     record.setVersion(VERSION);
     record.setCompileInfo(COMPILE_INFO);
@@ -82,4 +90,45 @@ public class TestRouterState {
 
     validateRecord(newRecord);
   }
+
+  @Test
+  public void testStateStoreResilience() throws Exception {
+    StateStoreService service = new StateStoreService();
+    Configuration conf = new Configuration();
+    conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS,
+        MockStateStoreDriver.class,
+        StateStoreDriver.class);
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false);
+    service.init(conf);
+    MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver();
+    // Add two records for block1
+    driver.put(MembershipState.newInstance("routerId", "ns1",
+        "ns1-ha1", "cluster1", "block1", "rpc1",
+        "service1", "lifeline1", "https", "nn01",
+        FederationNamenodeServiceState.ACTIVE, false), false, false);
+    driver.put(MembershipState.newInstance("routerId", "ns1",
+        "ns1-ha2", "cluster1", "block1", "rpc2",
+        "service2", "lifeline2", "https", "nn02",
+        FederationNamenodeServiceState.STANDBY, false), false, false);
+    // load the cache
+    service.loadDriver();
+    MembershipNamenodeResolver resolver = new MembershipNamenodeResolver(conf, service);
+    service.refreshCaches(true);
+
+    // look up block1
+    List<? extends FederationNamenodeContext> result =
+        resolver.getNamenodesForBlockPoolId("block1");
+    assertEquals(2, result.size());
+
+    // cause io errors and then reload the cache
+    driver.setGiveErrors(true);
+    long previousUpdate = service.getCacheUpdateTime();
+    service.refreshCaches(true);
+    assertEquals(previousUpdate, service.getCacheUpdateTime());
+
+    // make sure the old cache is still there
+    result = resolver.getNamenodesForBlockPoolId("block1");
+    assertEquals(2, result.size());
+    service.stop();
+  }
 }