Browse Source

HDFS-8968. Erasure coding: a comprehensive I/O throughput benchmark tool. Contributed by Rui Li.

Change-Id: I01ff5b04727fd79e6373582d4815c5e7b2096c67
Zhe Zhang 9 years ago
parent
commit
7b00c8e20e

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -201,6 +201,9 @@ Trunk (Unreleased)
     HDFS-9234. WebHdfs: getContentSummary() should give quota for storage types.
     (Surendra Singh Lilhore via xyao)
 
+    HDFS-8968. Erasure coding: a comprehensive I/O throughput benchmark tool.
+    (Rui Li via zhz)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 423 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java

@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
+import org.apache.hadoop.util.StopWatch;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class benchmarks the throughput of client read/write for both replica
+ * and Erasure Coding.
+ * <p/>
+ * Currently 4 operations are supported: read, write, generate and cleanup data.
+ * Users should specify an operation, the amount of data in MB for a single
+ * client, and which storage policy to use, i.e. EC or replication.
+ * Optionally, users can specify the number of clients to launch concurrently.
+ * The tool launches 1 thread for each client. Number of client is 1 by default.
+ * For reading, users can also specify whether stateful or positional read
+ * should be used. Stateful read is chosen by default.
+ * <p/>
+ * Each client reads and writes different files.
+ * For writing, client writes a temporary file at the desired amount, and the
+ * file will be cleaned up when the test finishes.
+ * For reading, each client tries to read the file specific to itself. And the
+ * client simply returns if such file does not exist. Therefore, users should
+ * generate the files before testing read. Generating data is essentially the
+ * same as writing, except that the files won't be cleared at the end.
+ * For example, if the user wants to test reading 1024MB data with 10 clients,
+ * he/she should firstly generate 1024MB data with 10 (or more) clients.
+ */
+public class ErasureCodeBenchmarkThroughput extends Configured implements Tool {
+
+  private static final int BUFFER_SIZE_MB = 128;
+  private static final String DFS_TMP_DIR = System.getProperty(
+      "test.benchmark.data", "/tmp/benchmark/data");
+  public static final String REP_DIR = DFS_TMP_DIR + "/replica";
+  public static final String EC_DIR = DFS_TMP_DIR + "/ec";
+  private static final String REP_FILE_BASE = "rep-file-";
+  private static final String EC_FILE_BASE = "ec-file-";
+  private static final String TMP_FILE_SUFFIX = ".tmp";
+  private static final ErasureCodingPolicy ecPolicy =
+      ErasureCodingPolicyManager.getSystemDefaultPolicy();
+  private static final byte[] data = new byte[BUFFER_SIZE_MB * 1024 * 1024];
+
+  static {
+    Random random = new Random();
+    random.nextBytes(data);
+  }
+
+  private final FileSystem fs;
+
+  public static ErasureCodingPolicy getEcPolicy() {
+    return ecPolicy;
+  }
+
+  public ErasureCodeBenchmarkThroughput(FileSystem fs) {
+    Preconditions.checkArgument(fs instanceof DistributedFileSystem);
+    this.fs = fs;
+  }
+
+  enum OpType {
+    READ, WRITE, GEN, CLEAN;
+  }
+
+  public static String getFilePath(int dataSizeMB, boolean isEc) {
+    String parent = isEc ? EC_DIR : REP_DIR;
+    String file = isEc ? EC_FILE_BASE : REP_FILE_BASE;
+    return parent + "/" + file + dataSizeMB + "MB";
+  }
+
+  private static void printUsage(String msg) {
+    if (msg != null) {
+      System.out.println(msg);
+    }
+    System.err.println("Usage: ErasureCodeBenchmarkThroughput " +
+        "<read|write|gen|clean> <size in MB> <ec|rep> [num clients] [stf|pos]\n"
+        + "Stateful and positional option is only available for read.");
+    System.exit(1);
+  }
+
+  private List<Long> doBenchmark(boolean isRead, int dataSizeMB, int numClients,
+      boolean isEc, boolean statefulRead, boolean isGen) throws Exception {
+    CompletionService<Long> cs = new ExecutorCompletionService<Long>(
+        Executors.newFixedThreadPool(numClients));
+    for (int i = 0; i < numClients; i++) {
+      cs.submit(isRead ?
+          new ReadCallable(dataSizeMB, isEc, i, statefulRead) :
+          new WriteCallable(dataSizeMB, isEc, i, isGen));
+    }
+    List<Long> results = new ArrayList<>(numClients);
+    for (int i = 0; i < numClients; i++) {
+      results.add(cs.take().get());
+    }
+    return results;
+  }
+
+  private void setReadThreadPoolSize(int numClients) {
+    int numThread = numClients * ecPolicy.getNumDataUnits();
+    getConf().setInt(HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
+        numThread);
+  }
+
+  private DecimalFormat getDecimalFormat() {
+    return new DecimalFormat("#.##");
+  }
+
+  private void benchmark(OpType type, int dataSizeMB,
+      int numClients, boolean isEc, boolean statefulRead) throws Exception {
+    List<Long> sizes = null;
+    StopWatch sw = new StopWatch().start();
+    switch (type) {
+      case READ:
+        sizes = doBenchmark(true, dataSizeMB, numClients, isEc,
+            statefulRead, false);
+        break;
+      case WRITE:
+        sizes = doBenchmark(
+            false, dataSizeMB, numClients, isEc, statefulRead, false);
+        break;
+      case GEN:
+        sizes = doBenchmark(false, dataSizeMB, numClients, isEc,
+            statefulRead, true);
+    }
+    long elapsedSec = sw.now(TimeUnit.SECONDS);
+    double totalDataSizeMB = 0;
+    for (Long size : sizes) {
+      if (size >= 0) {
+        totalDataSizeMB += size.doubleValue() / 1024 / 1024;
+      }
+    }
+    double throughput = totalDataSizeMB / elapsedSec;
+    DecimalFormat df = getDecimalFormat();
+    System.out.println(type + " " + df.format(totalDataSizeMB) +
+        " MB data takes: " + elapsedSec + " s.\nTotal throughput: " +
+        df.format(throughput) + " MB/s.");
+  }
+
+  private void setUpDir() throws IOException {
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    dfs.mkdirs(new Path(DFS_TMP_DIR));
+    Path repPath = new Path(REP_DIR);
+    Path ecPath = new Path(EC_DIR);
+    if (!dfs.exists(repPath)) {
+      dfs.mkdirs(repPath);
+    } else {
+      Preconditions.checkArgument(
+          dfs.getClient().getErasureCodingPolicy(repPath.toString()) == null);
+    }
+    if (!dfs.exists(ecPath)) {
+      dfs.mkdirs(ecPath);
+      dfs.getClient().setErasureCodingPolicy(ecPath.toString(), ecPolicy);
+    } else {
+      Preconditions.checkArgument(
+          dfs.getClient().
+              getErasureCodingPolicy(ecPath.toString()).equals(ecPolicy));
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    OpType type = null;
+    int dataSizeMB = 0;
+    boolean isEc = true;
+    int numClients = 1;
+    boolean statefulRead = true;
+    if (args.length >= 3) {
+      if (args[0].equals("read")) {
+        type = OpType.READ;
+      } else if (args[0].equals("write")) {
+        type = OpType.WRITE;
+      } else if (args[0].equals("gen")) {
+        type = OpType.GEN;
+      } else if (args[0].equals("clean")) {
+        type = OpType.CLEAN;
+      } else {
+        printUsage("Unknown operation: " + args[0]);
+      }
+      try {
+        dataSizeMB = Integer.valueOf(args[1]);
+        if (dataSizeMB <= 0) {
+          printUsage("Invalid data size: " + dataSizeMB);
+        }
+      } catch (NumberFormatException e) {
+        printUsage("Invalid data size: " + e.getMessage());
+      }
+      isEc = args[2].equals("ec");
+      if (!isEc && !args[2].equals("rep")) {
+        printUsage("Unknown storage policy: " + args[2]);
+      }
+    } else {
+      printUsage(null);
+    }
+    if (args.length >= 4 && type != OpType.CLEAN) {
+      try {
+        numClients = Integer.valueOf(args[3]);
+        if (numClients <= 0) {
+          printUsage("Invalid num of clients: " + numClients);
+        }
+      } catch (NumberFormatException e) {
+        printUsage("Invalid num of clients: " + e.getMessage());
+      }
+    }
+    if (args.length >= 5 && type == OpType.READ) {
+      statefulRead = args[4].equals("stf");
+      if (!statefulRead && !args[4].equals("pos")) {
+        printUsage("Unknown read mode: " + args[4]);
+      }
+    }
+
+    setUpDir();
+    if (type == OpType.CLEAN) {
+      cleanUp(dataSizeMB, isEc);
+    } else {
+      if (type == OpType.READ && isEc) {
+        setReadThreadPoolSize(numClients);
+      }
+      benchmark(type, dataSizeMB, numClients, isEc, statefulRead);
+    }
+    return 0;
+  }
+
+  private void cleanUp(int dataSizeMB, boolean isEc) throws IOException {
+    final String fileName = getFilePath(dataSizeMB, isEc);
+    Path path = isEc ? new Path(EC_DIR) : new Path(REP_DIR);
+    FileStatus fileStatuses[] = fs.listStatus(path, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.toString().contains(fileName);
+      }
+    });
+    for (FileStatus fileStatus : fileStatuses) {
+      fs.delete(fileStatus.getPath(), false);
+    }
+  }
+
+  /**
+   * A Callable that returns the number of bytes read/written
+   */
+  private abstract class CallableBase implements Callable<Long> {
+    protected final int dataSizeMB;
+    protected final boolean isEc;
+    protected final int id;
+
+    public CallableBase(int dataSizeMB, boolean isEc, int id)
+        throws IOException {
+      this.dataSizeMB = dataSizeMB;
+      this.isEc = isEc;
+      this.id = id;
+    }
+
+    protected String getFilePathForThread() {
+      return getFilePath(dataSizeMB, isEc) + "_" + id;
+    }
+  }
+
+  private class WriteCallable extends CallableBase {
+    private final boolean isGen;
+
+    public WriteCallable(int dataSizeMB, boolean isEc, int id, boolean isGen)
+        throws IOException {
+      super(dataSizeMB, isEc, id);
+      this.isGen = isGen;
+    }
+
+    private long writeFile(Path path) throws IOException {
+      StopWatch sw = new StopWatch().start();
+      System.out.println("Writing " + path);
+      long dataSize = dataSizeMB * 1024 * 1024L;
+      long remaining = dataSize;
+      try (FSDataOutputStream outputStream = fs.create(path)) {
+        if (!isGen) {
+          fs.deleteOnExit(path);
+        }
+        int toWrite;
+        while (remaining > 0) {
+          toWrite = (int) Math.min(remaining, data.length);
+          outputStream.write(data, 0, toWrite);
+          remaining -= toWrite;
+        }
+        System.out.println("Finished writing " + path + ". Time taken: " +
+            sw.now(TimeUnit.SECONDS) + " s.");
+        return dataSize - remaining;
+      }
+    }
+
+    @Override
+    public Long call() throws Exception {
+      String pathStr = getFilePathForThread();
+      if (!isGen) {
+        pathStr += TMP_FILE_SUFFIX;
+      }
+      final Path path = new Path(pathStr);
+      if (fs.exists(path)) {
+        if (isGen) {
+          System.out.println("Data already generated at " + path);
+        } else {
+          System.out.println("Previous tmp data not cleaned " + path);
+        }
+        return 0L;
+      }
+      return writeFile(path);
+    }
+  }
+
+  private class ReadCallable extends CallableBase {
+    private final boolean statefulRead;
+
+    public ReadCallable(int dataSizeMB, boolean isEc, int id,
+        boolean statefulRead) throws IOException {
+      super(dataSizeMB, isEc, id);
+      this.statefulRead = statefulRead;
+    }
+
+    private long doStateful(FSDataInputStream inputStream) throws IOException {
+      long count = 0;
+      long bytesRead;
+      ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE_MB * 1024 * 1024);
+      while (true) {
+        bytesRead = inputStream.read(buffer);
+        if (bytesRead < 0) {
+          break;
+        }
+        count += bytesRead;
+        buffer.clear();
+      }
+      return count;
+    }
+
+    private long doPositional(FSDataInputStream inputStream)
+        throws IOException {
+      long count = 0;
+      long bytesRead;
+      byte buf[] = new byte[BUFFER_SIZE_MB * 1024 * 1024];
+      while (true) {
+        bytesRead = inputStream.read(count, buf, 0, buf.length);
+        if (bytesRead < 0) {
+          break;
+        }
+        count += bytesRead;
+      }
+      return count;
+    }
+
+    private long readFile(Path path) throws IOException {
+      try (FSDataInputStream inputStream = fs.open(path)) {
+        StopWatch sw = new StopWatch().start();
+        System.out.println((statefulRead ? "Stateful reading " :
+            "Positional reading ") + path);
+        long totalRead = statefulRead ? doStateful(inputStream) :
+            doPositional(inputStream);
+        System.out.println(
+            (statefulRead ? "Finished stateful read " :
+                "Finished positional read ") + path + ". Time taken: " +
+                sw.now(TimeUnit.SECONDS) + " s.");
+        return totalRead;
+      }
+    }
+
+    @Override
+    public Long call() throws Exception {
+      Path path = new Path(getFilePathForThread());
+      if (!fs.exists(path) || fs.isDirectory(path)) {
+        System.out.println("File not found at " + path +
+            ". Call gen first?");
+        return 0L;
+      }
+      long bytesRead = readFile(path);
+      long dataSize = dataSizeMB * 1024 * 1024L;
+      Preconditions.checkArgument(bytesRead == dataSize,
+          "Specified data size: " + dataSize + ", actually read " + bytesRead);
+      return bytesRead;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    int res = ToolRunner.run(conf,
+        new ErasureCodeBenchmarkThroughput(fs), args);
+    System.exit(res);
+  }
+}

+ 116 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodeBenchmarkThroughput.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * To test {@link org.apache.hadoop.hdfs.ErasureCodeBenchmarkThroughput}.
+ */
+public class TestErasureCodeBenchmarkThroughput {
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    int numDN = ErasureCodeBenchmarkThroughput.getEcPolicy().getNumDataUnits() +
+        ErasureCodeBenchmarkThroughput.getEcPolicy().getNumParityUnits();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDN).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown(true);
+    }
+  }
+
+  private static void runBenchmark(String[] args) throws Exception {
+    Assert.assertNotNull(conf);
+    Assert.assertNotNull(fs);
+    Assert.assertEquals(0, ToolRunner.run(conf,
+        new ErasureCodeBenchmarkThroughput(fs), args));
+  }
+
+  private static void verifyNumFile(final int dataSize, final boolean isEc,
+      int numFile) throws IOException {
+    Path path = isEc ? new Path(ErasureCodeBenchmarkThroughput.EC_DIR) :
+        new Path(ErasureCodeBenchmarkThroughput.REP_DIR);
+    FileStatus[] statuses = fs.listStatus(path, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.toString().contains(
+            ErasureCodeBenchmarkThroughput.getFilePath(dataSize, isEc));
+      }
+    });
+    Assert.assertEquals(numFile, statuses.length);
+  }
+
+  @Test
+  public void testReplicaReadWrite() throws Exception {
+    Integer dataSize = 10;
+    Integer numClient = 3;
+    String[] args = new String[]{"write", dataSize.toString(), "rep",
+        numClient.toString()};
+    runBenchmark(args);
+    args[0] = "gen";
+    runBenchmark(args);
+    args[0] = "read";
+    runBenchmark(args);
+  }
+
+  @Test
+  public void testECReadWrite() throws Exception {
+    Integer dataSize = 5;
+    Integer numClient = 5;
+    String[] args = new String[]{"write", dataSize.toString(), "ec",
+        numClient.toString()};
+    runBenchmark(args);
+    args[0] = "gen";
+    runBenchmark(args);
+    args[0] = "read";
+    runBenchmark(args);
+  }
+
+  @Test
+  public void testCleanUp() throws Exception {
+    Integer dataSize = 5;
+    Integer numClient = 5;
+    String[] args = new String[]{"gen", dataSize.toString(), "ec",
+        numClient.toString()};
+    runBenchmark(args);
+    args[0] = "clean";
+    runBenchmark(args);
+    verifyNumFile(dataSize, true, 0);
+  }
+}