Browse Source

HDFS-17532. RBF: Allow router state store cache update to overwrite and delete in parallel (#6839)

Felix Nguyen 11 months ago
parent
commit
74d30a5dce

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -218,6 +218,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       FEDERATION_STORE_PREFIX + "driver.class";
       FEDERATION_STORE_PREFIX + "driver.class";
   public static final Class<? extends StateStoreDriver>
   public static final Class<? extends StateStoreDriver>
       FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class;
       FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class;
+  public static final String FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS =
+      FEDERATION_STORE_PREFIX + "driver.async.override.max.threads";
+  public static final int FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS_DEFAULT = -1;
 
 
   public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
   public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
       FEDERATION_STORE_PREFIX + "connection.test";
       FEDERATION_STORE_PREFIX + "connection.test";

+ 7 - 12
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -173,7 +172,7 @@ public abstract class CachedRecordStore<R extends BaseRecord>
    */
    */
   public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
   public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
     List<R> commitRecords = new ArrayList<>();
     List<R> commitRecords = new ArrayList<>();
-    List<R> toDeleteRecords = new ArrayList<>();
+    List<R> deleteRecords = new ArrayList<>();
     List<R> newRecords = query.getRecords();
     List<R> newRecords = query.getRecords();
     long currentDriverTime = query.getTimestamp();
     long currentDriverTime = query.getTimestamp();
     if (newRecords == null || currentDriverTime <= 0) {
     if (newRecords == null || currentDriverTime <= 0) {
@@ -184,22 +183,18 @@ public abstract class CachedRecordStore<R extends BaseRecord>
       if (record.shouldBeDeleted(currentDriverTime)) {
       if (record.shouldBeDeleted(currentDriverTime)) {
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         LOG.info("State Store record to delete {}: {}", recordName, record);
         LOG.info("State Store record to delete {}: {}", recordName, record);
-        toDeleteRecords.add(record);
+        deleteRecords.add(record);
       } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
       } else if (!record.isExpired() && record.checkExpired(currentDriverTime)) {
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         String recordName = StateStoreUtils.getRecordName(record.getClass());
         LOG.info("Override State Store record {}: {}", recordName, record);
         LOG.info("Override State Store record {}: {}", recordName, record);
         commitRecords.add(record);
         commitRecords.add(record);
       }
       }
     }
     }
-    if (commitRecords.size() > 0) {
-      getDriver().putAll(commitRecords, true, false);
-    }
-    if (!toDeleteRecords.isEmpty()) {
-      for (Map.Entry<R, Boolean> entry : getDriver().removeMultiple(toDeleteRecords).entrySet()) {
-        if (entry.getValue()) {
-          newRecords.remove(entry.getKey());
-        }
-      }
+    List<R> removedRecords = getDriver().handleOverwriteAndDelete(commitRecords, deleteRecords);
+    // In driver async mode, driver will return null and skip the next block.
+    // newRecords might be stale as a result but will sort itself out the next override cycle.
+    if (removedRecords != null && !removedRecords.isEmpty()) {
+      newRecords.removeAll(removedRecords);
     }
     }
   }
   }
 
 

+ 88 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java

@@ -17,13 +17,22 @@
  */
  */
 package org.apache.hadoop.hdfs.server.federation.store.driver;
 package org.apache.hadoop.hdfs.server.federation.store.driver;
 
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
@@ -54,6 +63,9 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
   /** State Store metrics. */
   /** State Store metrics. */
   private StateStoreMetrics metrics;
   private StateStoreMetrics metrics;
 
 
+  /** Thread pool to delegate overwrite and deletion asynchronously. */
+  private ThreadPoolExecutor executor = null;
+
   /**
   /**
    * Initialize the state store connection.
    * Initialize the state store connection.
    *
    *
@@ -88,6 +100,18 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
         return false;
         return false;
       }
       }
     }
     }
+
+    int nThreads = conf.getInt(
+        RBFConfigKeys.FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS,
+        RBFConfigKeys.FEDERATION_STORE_DRIVER_ASYNC_OVERRIDE_MAX_THREADS_DEFAULT);
+    if (nThreads > 0) {
+      executor = new ThreadPoolExecutor(nThreads, nThreads, 1L, TimeUnit.MINUTES,
+          new LinkedBlockingQueue<>());
+      executor.allowCoreThreadTimeOut(true);
+      LOG.info("Init StateStoreDriver in async mode with {} threads.", nThreads);
+    } else {
+      LOG.info("Init StateStoreDriver in sync mode.");
+    }
     return true;
     return true;
   }
   }
 
 
@@ -169,7 +193,12 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
    *
    *
    * @throws Exception if something goes wrong while closing the state store driver connection.
    * @throws Exception if something goes wrong while closing the state store driver connection.
    */
    */
-  public abstract void close() throws Exception;
+  public void close() throws Exception {
+    if (executor != null) {
+      executor.shutdown();
+      executor = null;
+    }
+  }
 
 
   /**
   /**
    * Returns the current time synchronization from the underlying store.
    * Returns the current time synchronization from the underlying store.
@@ -206,4 +235,62 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations {
     }
     }
     return hostname;
     return hostname;
   }
   }
+
+  /**
+   * Try to overwrite records in commitRecords and remove records in deleteRecords.
+   * Should return null if async mode is used. Else return removed records.
+   * @param commitRecords records to overwrite in state store
+   * @param deleteRecords records to remove from state store
+   * @param <R> record class
+   * @throws IOException when there is a failure during overwriting or deletion
+   * @return null if async mode is used, else removed records
+   */
+  public <R extends BaseRecord> List<R> handleOverwriteAndDelete(List<R> commitRecords,
+      List<R> deleteRecords) throws IOException {
+    List<R> result = null;
+    try {
+      // Overwrite all expired records.
+      if (commitRecords != null && !commitRecords.isEmpty()) {
+        Runnable overwriteCallable =
+            () -> {
+              try {
+                putAll(commitRecords, true, false);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            };
+        if (executor != null) {
+          executor.execute(overwriteCallable);
+        } else {
+          overwriteCallable.run();
+        }
+      }
+
+      // Delete all deletable records.
+      if (deleteRecords != null && !deleteRecords.isEmpty()) {
+        Map<R, Boolean> removedRecords = new HashMap<>();
+        Runnable deletionCallable = () -> {
+          try {
+            removedRecords.putAll(removeMultiple(deleteRecords));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        };
+        if (executor != null) {
+          executor.execute(deletionCallable);
+        } else {
+          result = new ArrayList<>();
+          deletionCallable.run();
+          for (Map.Entry<R, Boolean> entry : removedRecords.entrySet()) {
+            if (entry.getValue()) {
+              result.add(entry.getKey());
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return result;
+  }
 }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java

@@ -201,6 +201,7 @@ public abstract class StateStoreFileBaseImpl
 
 
   @Override
   @Override
   public void close() throws Exception {
   public void close() throws Exception {
+    super.close();
     if (this.concurrentStoreAccessPool != null) {
     if (this.concurrentStoreAccessPool != null) {
       this.concurrentStoreAccessPool.shutdown();
       this.concurrentStoreAccessPool.shutdown();
       boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS);
       boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS);

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java

@@ -125,6 +125,7 @@ public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
 
 
   @Override
   @Override
   public void close() throws Exception {
   public void close() throws Exception {
+    super.close();
     connectionFactory.shutdown();
     connectionFactory.shutdown();
   }
   }
 
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java

@@ -140,6 +140,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
 
 
   @Override
   @Override
   public void close() throws Exception {
   public void close() throws Exception {
+    super.close();
     if (executorService != null) {
     if (executorService != null) {
       executorService.shutdown();
       executorService.shutdown();
     }
     }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -370,6 +370,16 @@
     </description>
     </description>
   </property>
   </property>
 
 
+  <property>
+    <name>dfs.federation.router.store.driver.async.override.max.threads</name>
+    <value>-1</value>
+    <description>
+      Number of threads used by StateStoreDriver to overwrite and delete records asynchronously.
+      Only used by MembershipStore and RouterStore. Non-positive values will make StateStoreDriver
+      run in sync mode.
+    </description>
+  </property>
+
   <property>
   <property>
     <name>dfs.federation.router.store.connection.test</name>
     <name>dfs.federation.router.store.connection.test</name>
     <value>60000</value>
     <value>60000</value>

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md

@@ -469,6 +469,7 @@ The connection to the State Store and the internal caching at the Router.
 | dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. |
 | dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. |
 | dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. |
 | dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. |
 | dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. |
 | dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. |
+| dfs.federation.router.store.driver.async.override.max.threads |  | Number of threads to overwrite and delete records asynchronously when overriding. |
 | dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. |
 | dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. |
 | dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. |
 | dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. |
 | dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. |
 | dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. |

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java

@@ -58,6 +58,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl {
 
 
   @Override
   @Override
   public void close() throws Exception {
   public void close() throws Exception {
+    super.close();
     VALUE_MAP.clear();
     VALUE_MAP.clear();
     initialized = false;
     initialized = false;
   }
   }