Browse Source

HDFS-17506. [FGL] Performance for phase 1

ZanderXu 4 months ago
parent
commit
e8807726b8

+ 322 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/FSNLockBenchmarkThroughput.java

@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.fgl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class benchmarks the throughput of NN for both global-lock and fine-grained lock.
+ * Using some common used RPCs, such as create,addBlock,complete,append,rename,
+ * delete,setPermission,setOwner,setReplication,getFileInfo,getListing,getBlockLocation,
+ * to build some tasks according to readWrite ratio and testing count.
+ * Then create a thread pool with a concurrency of the numClient to perform these tasks.
+ * The performance difference between Global Lock and Fine-grained Lock can be
+ * obtained according to the execution time.
+ */
+public class FSNLockBenchmarkThroughput extends Configured implements Tool {
+
+  private final FileSystem fileSystem;
+
+  public FSNLockBenchmarkThroughput(FileSystem fileSystem) {
+    this.fileSystem = fileSystem;
+  }
+
+  public void benchmark(Path basePath, int readWriteRatio, int testingCount,
+      int numClients) throws Exception {
+    // private final
+    ArrayList<Path> readingPaths = new ArrayList<>();
+    for (int i = 0; i < 30; i++) {
+      Path path = new Path(basePath, "reading_" + i);
+      internalWriteFile(this.fileSystem, path, true, null);
+      readingPaths.add(path);
+    }
+
+    HashMap<String, Integer> detailInfo = new HashMap<>();
+
+    // building tasks.
+    List<Callable<Void>> tasks = buildTasks(this.fileSystem,
+        basePath, testingCount, readWriteRatio, readingPaths, detailInfo);
+    Collections.shuffle(tasks);
+
+    long startTime = System.currentTimeMillis();
+
+    ExecutorService executors = Executors.newFixedThreadPool(numClients);
+    try {
+      // Submit all tasks.
+      List<Future<Void>> futures = executors.invokeAll(tasks);
+
+      // Waiting result
+      for (Future<Void> f : futures) {
+        f.get();
+      }
+
+      long endTime = System.currentTimeMillis();
+      // Print result.
+      System.out.println("The Benchmark result is: " + tasks.size()
+          + " tasks with readWriteRatio " + readWriteRatio + " completed, taking "
+          + (endTime - startTime) + "(ms)");
+      detailInfo.forEach(
+          (k, v) -> System.out.println("\t operationName:" + k + ", testCount:" + v));
+    } finally {
+      executors.shutdown();
+      executors.shutdownNow();
+
+      for (Path path : readingPaths) {
+        this.fileSystem.delete(path, false);
+      }
+    }
+  }
+
+  // Write a little data to the path.
+  private void internalWriteFile(FileSystem fs, Path path, boolean writeData,
+      HashMap<String, Integer> detailInfo) throws IOException {
+    try (FSDataOutputStream outputStream = fs.create(path)) {
+      incOp(detailInfo, "create");
+      incOp(detailInfo, "complete");
+      byte[] data = new byte[1024];
+      ThreadLocalRandom.current().nextBytes(data);
+      if (writeData) {
+        outputStream.write(data);
+        incOp(detailInfo, "addBlock");
+      }
+    }
+  }
+
+  // Append a little data to the path
+  private void internalAppendFile(FileSystem fs, Path path, boolean writeData,
+      HashMap<String, Integer> detailInfo) throws IOException {
+    try (FSDataOutputStream outputStream = fs.append(path)) {
+      incOp(detailInfo, "append");
+      incOp(detailInfo, "complete");
+      byte[] data = new byte[1024];
+      ThreadLocalRandom.current().nextBytes(data);
+      if (writeData) {
+        outputStream.write(data);
+      }
+    }
+  }
+
+  /**
+   * Include getListing.
+   */
+  private Callable<Void> getListing(FileSystem fs, Path path,
+      HashMap<String, Integer> detailInfo) {
+    return () -> {
+      fs.listStatus(path);
+      incOp(detailInfo, "getListing");
+      return null;
+    };
+  }
+
+  private synchronized void incOp(HashMap<String, Integer> detailInfo, String opName) {
+    if (detailInfo != null) {
+      int value = detailInfo.getOrDefault(opName, 0);
+      detailInfo.put(opName, value + 1);
+    }
+  }
+
+  /**
+   * Include getBlockLocation.
+   */
+  private Callable<Void> getBlockLocation(FileSystem fs, Path path,
+      HashMap<String, Integer> detailInfo) {
+    return () -> {
+      fs.getFileBlockLocations(path, 0, Long.MAX_VALUE);
+      incOp(detailInfo, "getBlockLocation");
+      return null;
+    };
+  }
+
+  /**
+   * Include getFileInfo.
+   */
+  private Callable<Void> getFileInfo(FileSystem fs, Path path,
+      HashMap<String, Integer> detailInfo) {
+    return () -> {
+      fs.getFileStatus(path);
+      incOp(detailInfo, "getFileInfo");
+      return null;
+    };
+  }
+
+  /**
+   * Include create, addBlock, complete.
+   */
+  private Callable<Void> writeAndDeleteFile(FileSystem fs, Path path,
+      HashMap<String, Integer> detailInfo) {
+    return () -> {
+      internalWriteFile(fs, path, false, detailInfo);
+      fs.delete(path, false);
+      incOp(detailInfo, "delete");
+      return null;
+    };
+  }
+
+  /**
+   * Include create, addBlock, complete, append, complete, rename, setPermission, setOwner,
+   * setReplication and delete.
+   */
+  private Callable<Void> otherWriteOperation(FileSystem fs, Path path,
+      Path renameTargetPath, HashMap<String, Integer> detailInfo) {
+    return () -> {
+      // Create one file
+      internalWriteFile(fs, path, false, detailInfo);
+
+      // Append some data
+      internalAppendFile(fs, path, true, detailInfo);
+
+      // Rename
+      fs.rename(path, renameTargetPath);
+      incOp(detailInfo, "rename");
+
+      // SetPermission
+      fs.setPermission(renameTargetPath, FsPermission.getDirDefault());
+      incOp(detailInfo, "setPermission");
+
+      // SetOwner
+      fs.setOwner(renameTargetPath, "mock_user", "mock_group");
+      incOp(detailInfo, "setOwner");
+
+      // SetReplication
+      fs.setReplication(renameTargetPath, (short) 4);
+      incOp(detailInfo, "setReplication");
+
+      // Delete
+      fs.delete(renameTargetPath, false);
+      incOp(detailInfo, "delete");
+
+      return null;
+    };
+  }
+
+  /**
+   * Building some callable tasks according to testingCount and readWriteRatio.
+   */
+  private List<Callable<Void>> buildTasks(FileSystem fs, Path basePath, int testingCount,
+      int readWriteRatio, ArrayList<Path> readingPaths, HashMap<String, Integer> detailInfo) {
+    List<Callable<Void>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < testingCount; i++) {
+      // contains 4 * 10 write RPCs.
+      for (int j = 0; j < 10; j++) {
+        Path path = new Path(basePath, "write_" + i + "_" + j);
+        tasks.add(writeAndDeleteFile(fs, path, detailInfo));
+      }
+
+      // contains 10 write RPCs.
+      Path srcPath = new Path(basePath, "src_" + i + "_" + System.nanoTime());
+      Path targetPath = new Path(basePath, "target_" + i + "_" + System.nanoTime());
+      tasks.add(otherWriteOperation(fs, srcPath, targetPath, detailInfo));
+
+      // The number of write RPCs is 50.
+      for (int j = 0; j < readWriteRatio * 50; j++) {
+        int opNumber = ThreadLocalRandom.current().nextInt(5);
+        int pathIndex = ThreadLocalRandom.current().nextInt(readingPaths.size());
+        Path path = readingPaths.get(pathIndex);
+        if (opNumber <= 1) {
+          tasks.add(getFileInfo(fs, path, detailInfo));
+        } else if (opNumber <= 3) {
+          tasks.add(getBlockLocation(fs, path, detailInfo));
+        } else {
+          tasks.add(getListing(fs, path, detailInfo));
+        }
+      }
+    }
+    return tasks;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String basePath = "/tmp/fsnlock/benchmark/throughput";
+    int readWriteRatio = 20;
+    int testingCount = 100;
+    int numClients = 100;
+
+    if (args.length >= 4) {
+      basePath = args[0];
+
+      try {
+        readWriteRatio = Integer.parseInt(args[1]);
+        if (readWriteRatio <= 0) {
+          printUsage("Invalid readWrite ratio: " + readWriteRatio);
+        }
+      } catch (NumberFormatException e) {
+        printUsage("Invalid readWrite ratio: " + e.getMessage());
+      }
+
+      try {
+        testingCount = Integer.parseInt(args[2]);
+        if (testingCount <= 0) {
+          printUsage("Invalid testing count: " + testingCount);
+        }
+      } catch (NumberFormatException e) {
+        printUsage("Invalid testing count: " + e.getMessage());
+      }
+
+      try {
+        numClients = Integer.parseInt(args[3]);
+        if (numClients <= 0) {
+          printUsage("Invalid num of clients: " + numClients);
+        }
+      } catch (NumberFormatException e) {
+        printUsage("Invalid num of clients: " + e.getMessage());
+      }
+    } else {
+      printUsage(null);
+    }
+
+    benchmark(new Path(basePath), readWriteRatio, testingCount, numClients);
+    return 0;
+  }
+
+  private static void printUsage(String msg) {
+    if (msg != null) {
+      System.out.println(msg);
+    }
+    System.err.println("Usage: FSNLockBenchmarkThroughput " +
+        "<base path> <read write ratio> <testing count> <num clients>");
+    System.exit(1);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    int res = ToolRunner.run(conf, new FSNLockBenchmarkThroughput(fs), args);
+    System.exit(res);
+  }
+}

+ 104 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFSNLockBenchmarkThroughput.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.fgl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * To test {@link FSNLockBenchmarkThroughput}.
+ */
+public class TestFSNLockBenchmarkThroughput {
+
+  @Test
+  public void testFineGrainedLockingBenchmarkThroughput1() throws Exception {
+    testBenchmarkThroughput(true, 20, 100, 1000);
+  }
+
+  @Test
+  public void testFineGrainedLockingBenchmarkThroughput2() throws Exception {
+    testBenchmarkThroughput(true, 20, 1000, 1000);
+  }
+
+  @Test
+  public void testFineGrainedLockingBenchmarkThroughput3() throws Exception {
+    testBenchmarkThroughput(true, 10, 100, 100);
+  }
+
+  @Test
+  public void testGlobalLockingBenchmarkThroughput1() throws Exception {
+    testBenchmarkThroughput(false, 20, 100, 1000);
+  }
+
+  @Test
+  public void testGlobalLockingBenchmarkThroughput2() throws Exception {
+    testBenchmarkThroughput(false, 20, 1000, 1000);
+  }
+
+  @Test
+  public void testGlobalLockingBenchmarkThroughput3() throws Exception {
+    testBenchmarkThroughput(false, 10, 100, 100);
+  }
+
+  private void testBenchmarkThroughput(boolean enableFGL, int readWriteRatio,
+      int testingCount, int numClients) throws Exception {
+    MiniQJMHACluster qjmhaCluster = null;
+
+    try {
+      Configuration conf = new HdfsConfiguration();
+      conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+      conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 500);
+      if (enableFGL) {
+        conf.setClass(DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY,
+            FineGrainedFSNamesystemLock.class, FSNLockManager.class);
+      } else {
+        conf.setClass(DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY,
+            GlobalFSNamesystemLock.class, FSNLockManager.class);
+      }
+
+      MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
+      builder.getDfsBuilder().numDataNodes(10);
+      qjmhaCluster = builder.build();
+      MiniDFSCluster cluster = qjmhaCluster.getDfsCluster();
+
+      cluster.transitionToActive(0);
+      cluster.waitActive(0);
+
+      FileSystem fileSystem = cluster.getFileSystem(0);
+
+      String[] args = new String[]{"/tmp/fsnlock/benchmark/throughput",
+          String.valueOf(readWriteRatio), String.valueOf(testingCount),
+          String.valueOf(numClients)};
+
+      Assert.assertEquals(0, ToolRunner.run(conf,
+          new FSNLockBenchmarkThroughput(fileSystem), args));
+    } finally {
+      if (qjmhaCluster != null) {
+        qjmhaCluster.shutdown();
+      }
+    }
+  }
+}