فهرست منبع

HDFS-16967. RBF: File based state stores should allow concurrent access to the records (#5523)

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
Viraj Jasani 2 سال پیش
والد
کامیت
937caf7de9

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -255,6 +255,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final int FEDERATION_STORE_ZK_ASYNC_MAX_THREADS_DEFAULT =
       -1;
 
+  // HDFS Router-based federation File based store implementation specific configs
+  public static final String FEDERATION_STORE_FILE_ASYNC_THREADS =
+      FEDERATION_STORE_PREFIX + "driver.file.async.threads";
+  public static final int FEDERATION_STORE_FILE_ASYNC_THREADS_DEFAULT = 0;
+
+  public static final String FEDERATION_STORE_FS_ASYNC_THREADS =
+      FEDERATION_STORE_PREFIX + "driver.fs.async.threads";
+  public static final int FEDERATION_STORE_FS_ASYNC_THREADS_DEFAULT = 0;
+
   // HDFS Router safe mode
   public static final String DFS_ROUTER_SAFEMODE_ENABLE =
       FEDERATION_ROUTER_PREFIX + "safemode.enable";

+ 154 - 43
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java

@@ -25,14 +25,24 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
@@ -69,6 +79,8 @@ public abstract class StateStoreFileBaseImpl
   /** If it is initialized. */
   private boolean initialized = false;
 
+  private ExecutorService concurrentStoreAccessPool;
+
 
   /**
    * Get the reader of a record for the file system.
@@ -137,6 +149,8 @@ public abstract class StateStoreFileBaseImpl
    */
   protected abstract String getRootDir();
 
+  protected abstract int getConcurrentFilesAccessNumThreads();
+
   /**
    * Set the driver as initialized.
    *
@@ -168,9 +182,31 @@ public abstract class StateStoreFileBaseImpl
       return false;
     }
     setInitialized(true);
+    int threads = getConcurrentFilesAccessNumThreads();
+    if (threads > 1) {
+      this.concurrentStoreAccessPool =
+          new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS,
+              new LinkedBlockingQueue<>(),
+              new ThreadFactoryBuilder()
+                  .setNameFormat("state-store-file-based-concurrent-%d")
+                  .setDaemon(true).build());
+      LOG.info("File based state store will be accessed concurrently with {} max threads", threads);
+    } else {
+      LOG.info("File based state store will be accessed serially");
+    }
     return true;
   }
 
+  @Override
+  public void close() throws Exception {
+    if (this.concurrentStoreAccessPool != null) {
+      this.concurrentStoreAccessPool.shutdown();
+      boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS);
+      LOG.info("Concurrent store access pool is terminated: {}", isTerminated);
+      this.concurrentStoreAccessPool = null;
+    }
+  }
+
   @Override
   public <T extends BaseRecord> boolean initRecordStorage(
       String className, Class<T> recordClass) {
@@ -198,22 +234,29 @@ public abstract class StateStoreFileBaseImpl
     verifyDriverReady();
     long start = monotonicNow();
     StateStoreMetrics metrics = getMetrics();
-    List<T> ret = new ArrayList<>();
+    List<T> result = Collections.synchronizedList(new ArrayList<>());
     try {
       String path = getPathForClass(clazz);
       List<String> children = getChildren(path);
-      for (String child : children) {
-        String pathRecord = path + "/" + child;
-        if (child.endsWith(TMP_MARK)) {
-          LOG.debug("There is a temporary file {} in {}", child, path);
-          if (isOldTempRecord(child)) {
-            LOG.warn("Removing {} as it's an old temporary record", child);
-            remove(pathRecord);
-          }
-        } else {
-          T record = getRecord(pathRecord, clazz);
-          ret.add(record);
+      List<Callable<Void>> callables = new ArrayList<>();
+      children.forEach(child -> callables.add(
+          () -> getRecordsFromFileAndRemoveOldTmpRecords(clazz, result, path, child)));
+      if (this.concurrentStoreAccessPool != null) {
+        // Read records concurrently
+        List<Future<Void>> futures = this.concurrentStoreAccessPool.invokeAll(callables);
+        for (Future<Void> future : futures) {
+          future.get();
         }
+      } else {
+        // Read records serially
+        callables.forEach(e -> {
+          try {
+            e.call();
+          } catch (Exception ex) {
+            LOG.error("Failed to retrieve record using file operations.", ex);
+            throw new RuntimeException(ex);
+          }
+        });
       }
     } catch (Exception e) {
       if (metrics != null) {
@@ -227,7 +270,37 @@ public abstract class StateStoreFileBaseImpl
     if (metrics != null) {
       metrics.addRead(monotonicNow() - start);
     }
-    return new QueryResult<T>(ret, getTime());
+    return new QueryResult<>(result, getTime());
+  }
+
+  /**
+   * Get the state store record from the given path (path/child) and add the record to the
+   * result list.
+   *
+   * @param clazz Class of the record.
+   * @param result The list of results record. The records would be added to it unless the given
+   * path represents old temp file.
+   * @param path The parent path.
+   * @param child The child path under the parent path. Both path and child completes the file
+   * location for the given record.
+   * @param <T> Record class of the records.
+   * @return Void.
+   * @throws IOException If the file read operation fails.
+   */
+  private <T extends BaseRecord> Void getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz,
+      List<T> result, String path, String child) throws IOException {
+    String pathRecord = path + "/" + child;
+    if (child.endsWith(TMP_MARK)) {
+      LOG.debug("There is a temporary file {} in {}", child, path);
+      if (isOldTempRecord(child)) {
+        LOG.warn("Removing {} as it's an old temporary record", child);
+        remove(pathRecord);
+      }
+    } else {
+      T record = getRecord(pathRecord, clazz);
+      result.add(record);
+    }
+    return null;
   }
 
   /**
@@ -260,23 +333,17 @@ public abstract class StateStoreFileBaseImpl
    */
   private <T extends BaseRecord> T getRecord(
       final String path, final Class<T> clazz) throws IOException {
-    BufferedReader reader = getReader(path);
-    try {
+    try (BufferedReader reader = getReader(path)) {
       String line;
       while ((line = reader.readLine()) != null) {
         if (!line.startsWith("#") && line.length() > 0) {
           try {
-            T record = newRecord(line, clazz, false);
-            return record;
+            return newRecord(line, clazz, false);
           } catch (Exception ex) {
             LOG.error("Cannot parse line {} in file {}", line, path, ex);
           }
         }
       }
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
     }
     throw new IOException("Cannot read " + path + " for record " +
         clazz.getSimpleName());
@@ -330,13 +397,12 @@ public abstract class StateStoreFileBaseImpl
           record.setDateModified(this.getTime());
           toWrite.put(recordPath, record);
         } else if (errorIfExists) {
-          LOG.error("Attempt to insert record {} that already exists",
-              recordPath);
+          LOG.error("Attempt to insert record {} that already exists", recordPath);
           if (metrics != null) {
             metrics.addFailure(monotonicNow() - start);
           }
           return false;
-        } else  {
+        } else {
           LOG.debug("Not updating {}", record);
         }
       } else {
@@ -345,36 +411,81 @@ public abstract class StateStoreFileBaseImpl
     }
 
     // Write the records
-    boolean success = true;
-    for (Entry<String, T> entry : toWrite.entrySet()) {
-      String recordPath = entry.getKey();
-      String recordPathTemp = recordPath + "." + now() + TMP_MARK;
-      boolean recordWrittenSuccessfully = true;
-      try (BufferedWriter writer = getWriter(recordPathTemp)) {
-        T record = entry.getValue();
-        String line = serializeString(record);
-        writer.write(line);
-      } catch (IOException e) {
-        LOG.error("Cannot write {}", recordPathTemp, e);
-        recordWrittenSuccessfully = false;
-        success = false;
+    final AtomicBoolean success = new AtomicBoolean(true);
+    final List<Callable<Void>> callables = new ArrayList<>();
+    toWrite.entrySet().forEach(entry -> callables.add(() -> writeRecordToFile(success, entry)));
+    if (this.concurrentStoreAccessPool != null) {
+      // Write records concurrently
+      List<Future<Void>> futures = null;
+      try {
+        futures = this.concurrentStoreAccessPool.invokeAll(callables);
+      } catch (InterruptedException e) {
+        success.set(false);
+        LOG.error("Failed to put record concurrently.", e);
       }
-      // Commit
-      if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
-        LOG.error("Failed committing record into {}", recordPath);
-        success = false;
+      if (futures != null) {
+        for (Future<Void> future : futures) {
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            success.set(false);
+            LOG.error("Failed to retrieve results from concurrent record put runs.", e);
+          }
+        }
       }
+    } else {
+      // Write records serially
+      callables.forEach(callable -> {
+        try {
+          callable.call();
+        } catch (Exception e) {
+          success.set(false);
+          LOG.error("Failed to put record.", e);
+        }
+      });
     }
 
     long end = monotonicNow();
     if (metrics != null) {
-      if (success) {
+      if (success.get()) {
         metrics.addWrite(end - start);
       } else {
         metrics.addFailure(end - start);
       }
     }
-    return success;
+    return success.get();
+  }
+
+  /**
+   * Writes the state store record to the file. At first, the record is written to a temp location
+   * and then later renamed to the final location that is passed with the entry key.
+   *
+   * @param success The atomic boolean that gets updated to false if the file write operation fails.
+   * @param entry The entry of the record path and the state store record to be written to the file
+   * by first writing to a temp location and then renaming it to the record path.
+   * @param <T> Record class of the records.
+   * @return Void.
+   */
+  private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
+      Entry<String, T> entry) {
+    String recordPath = entry.getKey();
+    String recordPathTemp = recordPath + "." + now() + TMP_MARK;
+    boolean recordWrittenSuccessfully = true;
+    try (BufferedWriter writer = getWriter(recordPathTemp)) {
+      T record = entry.getValue();
+      String line = serializeString(record);
+      writer.write(line);
+    } catch (IOException e) {
+      LOG.error("Cannot write {}", recordPathTemp, e);
+      recordWrittenSuccessfully = false;
+      success.set(false);
+    }
+    // Commit
+    if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
+      LOG.error("Failed committing record into {}", recordPath);
+      success.set(false);
+    }
+    return null;
   }
 
   @Override

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java

@@ -109,6 +109,12 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
     return this.rootDirectory;
   }
 
+  @Override
+  protected int getConcurrentFilesAccessNumThreads() {
+    return getConf().getInt(RBFConfigKeys.FEDERATION_STORE_FILE_ASYNC_THREADS,
+        RBFConfigKeys.FEDERATION_STORE_FILE_ASYNC_THREADS_DEFAULT);
+  }
+
   @Override
   protected <T extends BaseRecord> BufferedReader getReader(String filename) {
     BufferedReader reader = null;
@@ -144,6 +150,7 @@ public class StateStoreFileImpl extends StateStoreFileBaseImpl {
 
   @Override
   public void close() throws Exception {
+    super.close();
     setInitialized(false);
   }
 

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java

@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link StateStoreDriver} implementation based on a filesystem. The common
  * implementation uses HDFS as a backend. The path can be specified setting
- * dfs.federation.router.driver.fs.path=hdfs://host:port/path/to/store.
+ * dfs.federation.router.store.driver.fs.path=hdfs://host:port/path/to/store.
  */
 public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
 
@@ -117,8 +117,15 @@ public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl {
     return this.workPath;
   }
 
+  @Override
+  protected int getConcurrentFilesAccessNumThreads() {
+    return getConf().getInt(RBFConfigKeys.FEDERATION_STORE_FS_ASYNC_THREADS,
+        RBFConfigKeys.FEDERATION_STORE_FS_ASYNC_THREADS_DEFAULT);
+  }
+
   @Override
   public void close() throws Exception {
+    super.close();
     if (fs != null) {
       fs.close();
     }

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -894,4 +894,32 @@
       If this is below 0, the auto-refresh is disabled.
     </description>
   </property>
+
+  <property>
+    <name>dfs.federation.router.store.driver.file.async.threads</name>
+    <value>0</value>
+    <description>
+      Max threads used by StateStoreFileImpl to access state store files concurrently.
+      The only class currently being supported:
+      org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.
+      Default value is 0, which means StateStoreFileImpl would work in sync mode, meaning it
+      would access one file at a time.
+      Use positive integer value to enable concurrent files access.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.store.driver.fs.async.threads</name>
+    <value>0</value>
+    <description>
+      Max threads used by StateStoreFileSystemImpl to access state store files from the given
+      filesystem concurrently.
+      The only class currently being supported:
+      org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl.
+      Default value is 0, which means StateStoreFileSystemImpl would work in sync mode, meaning it
+      would access one file from the filesystem at a time.
+      Use positive integer value to enable concurrent files access from the given filesystem.
+    </description>
+  </property>
+
 </configuration>

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java

@@ -94,6 +94,7 @@ public class TestStateStoreDriverBase {
   public static void tearDownCluster() {
     if (stateStore != null) {
       stateStore.stop();
+      stateStore = null;
     }
   }
 

+ 28 - 4
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFile.java

@@ -18,31 +18,55 @@
 package org.apache.hadoop.hdfs.server.federation.store.driver;
 
 import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_FILE_ASYNC_THREADS;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
+
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
  */
+@RunWith(Parameterized.class)
 public class TestStateStoreFile extends TestStateStoreDriverBase {
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
+  private final String numFileAsyncThreads;
+
+  public TestStateStoreFile(String numFileAsyncThreads) {
+    this.numFileAsyncThreads = numFileAsyncThreads;
+  }
+
+  @Parameterized.Parameters(name = "numFileAsyncThreads-{0}")
+  public static List<String[]> data() {
+    return Arrays.asList(new String[][] {{"20"}, {"0"}});
+  }
+
+  private static void setupCluster(String numFsAsyncThreads) throws Exception {
     Configuration conf = getStateStoreConfiguration(StateStoreFileImpl.class);
+    conf.setInt(FEDERATION_STORE_FILE_ASYNC_THREADS, Integer.parseInt(numFsAsyncThreads));
     getStateStore(conf);
   }
 
   @Before
-  public void startup() throws IOException {
+  public void startup() throws Exception {
+    setupCluster(numFileAsyncThreads);
     removeAll(getStateStoreDriver());
   }
 
+  @After
+  public void tearDown() throws Exception {
+    tearDownCluster();
+  }
+
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {

+ 33 - 14
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreFileSystem.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.federation.store.driver;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -26,12 +28,15 @@ import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUt
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
-import org.junit.AfterClass;
+
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.stubbing.Answer;
 
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_FS_ASYNC_THREADS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -41,16 +46,22 @@ import static org.mockito.Mockito.spy;
 /**
  * Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
  */
+@RunWith(Parameterized.class)
 public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
 
   private static MiniDFSCluster dfsCluster;
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    Configuration conf = FederationStateStoreTestUtils
-        .getStateStoreConfiguration(StateStoreFileSystemImpl.class);
-    conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH,
-        "/hdfs-federation/");
+  private final String numFsAsyncThreads;
+
+  public TestStateStoreFileSystem(String numFsAsyncThreads) {
+    this.numFsAsyncThreads = numFsAsyncThreads;
+  }
+
+  private static void setupCluster(String numFsAsyncThreads) throws Exception {
+    Configuration conf =
+        FederationStateStoreTestUtils.getStateStoreConfiguration(StateStoreFileSystemImpl.class);
+    conf.set(StateStoreFileSystemImpl.FEDERATION_STORE_FS_PATH, "/hdfs-federation/");
+    conf.setInt(FEDERATION_STORE_FS_ASYNC_THREADS, Integer.parseInt(numFsAsyncThreads));
 
     // Create HDFS cluster to back the state tore
     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
@@ -60,18 +71,26 @@ public class TestStateStoreFileSystem extends TestStateStoreDriverBase {
     getStateStore(conf);
   }
 
-  @AfterClass
-  public static void tearDownCluster() {
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-    }
+  @Parameterized.Parameters(name = "numFsAsyncThreads-{0}")
+  public static List<String[]> data() {
+    return Arrays.asList(new String[][] {{"20"}, {"0"}});
   }
 
   @Before
-  public void startup() throws IOException {
+  public void startup() throws Exception {
+    setupCluster(numFsAsyncThreads);
     removeAll(getStateStoreDriver());
   }
 
+  @After
+  public void tearDown() throws Exception {
+    tearDownCluster();
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
   @Test
   public void testInsert()
       throws IllegalArgumentException, IllegalAccessException, IOException {