Kaynağa Gözat

HDFS-16286. Add a debug tool to verify the correctness of erasure coding on file (#3593)

(cherry picked from commit a21895a5b3644944fe04cf558d593b96da0263fd)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java

(cherry picked from commit 29fd36e2f1e3b34b3cb4c047d93bf88a70dc8149)
daimin 3 yıl önce
ebeveyn
işleme
cbf4a33549

+ 231 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java

@@ -25,15 +25,42 @@ import java.io.FileOutputStream;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -68,6 +95,7 @@ public class DebugAdmin extends Configured implements Tool {
       new VerifyMetaCommand(),
       new ComputeMetaCommand(),
       new RecoverLeaseCommand(),
+      new VerifyECCommand(),
       new HelpCommand()
   };
 
@@ -386,6 +414,209 @@ public class DebugAdmin extends Configured implements Tool {
     }
   }
 
+  /**
+   * The command for verifying the correctness of erasure coding on an erasure coded file.
+   */
+  private class VerifyECCommand extends DebugCommand {
+    private DFSClient client;
+    private int dataBlkNum;
+    private int parityBlkNum;
+    private int cellSize;
+    private boolean useDNHostname;
+    private CachingStrategy cachingStrategy;
+    private int stripedReadBufferSize;
+    private CompletionService<Integer> readService;
+    private RawErasureEncoder encoder;
+    private BlockReader[] blockReaders;
+
+
+    VerifyECCommand() {
+      super("verifyEC",
+          "verifyEC -file <file>",
+          "  Verify HDFS erasure coding on all block groups of the file.");
+    }
+
+    int run(List<String> args) throws IOException {
+      if (args.size() < 2) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      String file = StringUtils.popOptionWithArgument("-file", args);
+      Path path = new Path(file);
+      DistributedFileSystem dfs = AdminHelper.getDFS(getConf());
+      this.client = dfs.getClient();
+
+      FileStatus fileStatus;
+      try {
+        fileStatus = dfs.getFileStatus(path);
+      } catch (FileNotFoundException e) {
+        System.err.println("File " + file + " does not exist.");
+        return 1;
+      }
+
+      if (!fileStatus.isFile()) {
+        System.err.println("File " + file + " is not a regular file.");
+        return 1;
+      }
+      if (!dfs.isFileClosed(path)) {
+        System.err.println("File " + file + " is not closed.");
+        return 1;
+      }
+      this.useDNHostname = getConf().getBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+          DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+      this.cachingStrategy = CachingStrategy.newDefaultStrategy();
+      this.stripedReadBufferSize = getConf().getInt(
+          DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+          DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT);
+
+      LocatedBlocks locatedBlocks = client.getLocatedBlocks(file, 0, fileStatus.getLen());
+      if (locatedBlocks.getErasureCodingPolicy() == null) {
+        System.err.println("File " + file + " is not erasure coded.");
+        return 1;
+      }
+      ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
+      this.dataBlkNum = ecPolicy.getNumDataUnits();
+      this.parityBlkNum = ecPolicy.getNumParityUnits();
+      this.cellSize = ecPolicy.getCellSize();
+      this.encoder = CodecUtil.createRawEncoder(getConf(), ecPolicy.getCodecName(),
+          new ErasureCoderOptions(
+              ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()));
+      int blockNum = dataBlkNum + parityBlkNum;
+      this.readService = new ExecutorCompletionService<>(
+          DFSUtilClient.getThreadPoolExecutor(blockNum, blockNum, 60,
+              new LinkedBlockingQueue<>(), "read-", false));
+      this.blockReaders = new BlockReader[dataBlkNum + parityBlkNum];
+
+      for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+        System.out.println("Checking EC block group: blk_" + locatedBlock.getBlock().getBlockId());
+        LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
+
+        try {
+          verifyBlockGroup(blockGroup);
+          System.out.println("Status: OK");
+        } catch (Exception e) {
+          System.err.println("Status: ERROR, message: " + e.getMessage());
+          return 1;
+        } finally {
+          closeBlockReaders();
+        }
+      }
+      System.out.println("\nAll EC block group status: OK");
+      return 0;
+    }
+
+    private void verifyBlockGroup(LocatedStripedBlock blockGroup) throws Exception {
+      final LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
+          cellSize, dataBlkNum, parityBlkNum);
+
+      int blockNumExpected = Math.min(dataBlkNum,
+          (int) ((blockGroup.getBlockSize() - 1) / cellSize + 1)) + parityBlkNum;
+      if (blockGroup.getBlockIndices().length < blockNumExpected) {
+        throw new Exception("Block group is under-erasure-coded.");
+      }
+
+      long maxBlockLen = 0L;
+      DataChecksum checksum = null;
+      for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+        LocatedBlock block = indexedBlocks[i];
+        if (block == null) {
+          blockReaders[i] = null;
+          continue;
+        }
+        if (block.getBlockSize() > maxBlockLen) {
+          maxBlockLen = block.getBlockSize();
+        }
+        BlockReader blockReader = createBlockReader(block.getBlock(),
+            block.getLocations()[0], block.getBlockToken());
+        if (checksum == null) {
+          checksum = blockReader.getDataChecksum();
+        } else {
+          assert checksum.equals(blockReader.getDataChecksum());
+        }
+        blockReaders[i] = blockReader;
+      }
+      assert checksum != null;
+      int bytesPerChecksum = checksum.getBytesPerChecksum();
+      int bufferSize = stripedReadBufferSize < bytesPerChecksum ? bytesPerChecksum :
+          stripedReadBufferSize - stripedReadBufferSize % bytesPerChecksum;
+      final ByteBuffer[] buffers = new ByteBuffer[dataBlkNum + parityBlkNum];
+      final ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
+      for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+        buffers[i] = ByteBuffer.allocate(bufferSize);
+      }
+      for (int i = 0; i < parityBlkNum; i++) {
+        outputs[i] = ByteBuffer.allocate(bufferSize);
+      }
+      long positionInBlock = 0L;
+      while (positionInBlock < maxBlockLen) {
+        final int toVerifyLen = (int) Math.min(bufferSize, maxBlockLen - positionInBlock);
+        List<Future<Integer>> futures = new ArrayList<>(dataBlkNum + parityBlkNum);
+        for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+          final int fi = i;
+          futures.add(this.readService.submit(() -> {
+            BlockReader blockReader = blockReaders[fi];
+            ByteBuffer buffer = buffers[fi];
+            buffer.clear();
+            buffer.limit(toVerifyLen);
+            int readLen = 0;
+            if (blockReader != null) {
+              int toRead = buffer.remaining();
+              while (readLen < toRead) {
+                int nread = blockReader.read(buffer);
+                if (nread <= 0) {
+                  break;
+                }
+                readLen += nread;
+              }
+            }
+            while (buffer.hasRemaining()) {
+              buffer.put((byte) 0);
+            }
+            buffer.flip();
+            return readLen;
+          }));
+        }
+        for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+          futures.get(i).get(1, TimeUnit.MINUTES);
+        }
+        ByteBuffer[] inputs = new ByteBuffer[dataBlkNum];
+        System.arraycopy(buffers, 0, inputs, 0, dataBlkNum);
+        for (int i = 0; i < parityBlkNum; i++) {
+          outputs[i].clear();
+          outputs[i].limit(toVerifyLen);
+        }
+        this.encoder.encode(inputs, outputs);
+        for (int i = 0; i < parityBlkNum; i++) {
+          if (!buffers[dataBlkNum + i].equals(outputs[i])) {
+            throw new Exception("EC compute result not match.");
+          }
+        }
+        positionInBlock += toVerifyLen;
+      }
+    }
+
+    private BlockReader createBlockReader(ExtendedBlock block, DatanodeInfo dnInfo,
+                                          Token<BlockTokenIdentifier> token) throws IOException {
+      InetSocketAddress dnAddress = NetUtils.createSocketAddr(dnInfo.getXferAddr(useDNHostname));
+      Peer peer = client.newConnectedPeer(dnAddress, token, dnInfo);
+      return BlockReaderRemote.newBlockReader(
+          "dummy", block, token, 0,
+          block.getNumBytes(), true, "", peer, dnInfo,
+          null, cachingStrategy, -1);
+    }
+
+    private void closeBlockReaders() {
+      for (int i = 0; i < blockReaders.length; i++) {
+        if (blockReaders[i] != null) {
+          IOUtils.closeStream(blockReaders[i]);
+          blockReaders[i] = null;
+        }
+      }
+    }
+
+  }
+
   /**
    * The command for getting help about other commands.
    */

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md

@@ -676,6 +676,16 @@ Usage: `hdfs debug recoverLease -path <path> [-retries <num-retries>]`
 
 Recover the lease on the specified path. The path must reside on an HDFS file system. The default number of retries is 1.
 
+### `verifyEC`
+
+Usage: `hdfs debug verifyEC -file <file>`
+
+| COMMAND\_OPTION | Description |
+|:---- |:---- |
+| [`-file` *EC-file*] | HDFS EC file to be verified. |
+
+Verify the correctness of erasure coding on an erasure coded file.
+
 dfsadmin with ViewFsOverloadScheme
 ----------------------------------
 

+ 110 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDebugAdmin.java

@@ -17,15 +17,22 @@
  */
 package org.apache.hadoop.hdfs.tools;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -34,6 +41,8 @@ import org.junit.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintStream;
+import java.util.List;
+import java.util.Random;
 
 import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil.*;
 import static org.junit.Assert.assertEquals;
@@ -44,23 +53,16 @@ public class TestDebugAdmin {
   static private final String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp"),
           TestDebugAdmin.class.getSimpleName()).getAbsolutePath();
-
+  private Configuration conf = new Configuration();
   private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
   private DebugAdmin admin;
-  private DataNode datanode;
 
   @Before
   public void setUp() throws Exception {
     final File testRoot = new File(TEST_ROOT_DIR);
     testRoot.delete();
     testRoot.mkdirs();
-    Configuration conf = new Configuration();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
     admin = new DebugAdmin(conf);
-    datanode = cluster.getDataNodes().get(0);
   }
 
   @After
@@ -92,8 +94,11 @@ public class TestDebugAdmin {
 
   @Test(timeout = 60000)
   public void testRecoverLease() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
     assertEquals("ret: 1, You must supply a -path argument to recoverLease.",
         runCmd(new String[]{"recoverLease", "-retries", "1"}));
+    DistributedFileSystem fs = cluster.getFileSystem();
     FSDataOutputStream out = fs.create(new Path("/foo"));
     out.write(123);
     out.close();
@@ -103,6 +108,10 @@ public class TestDebugAdmin {
 
   @Test(timeout = 60000)
   public void testVerifyMetaCommand() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DataNode datanode = cluster.getDataNodes().get(0);
     DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
     FsDatasetSpi<?> fsd = datanode.getFSDataset();
     ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
@@ -128,6 +137,10 @@ public class TestDebugAdmin {
 
   @Test(timeout = 60000)
   public void testComputeMetaCommand() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DataNode datanode = cluster.getDataNodes().get(0);
     DFSTestUtil.createFile(fs, new Path("/bar"), 1234, (short) 1, 0xdeadbeef);
     FsDatasetSpi<?> fsd = datanode.getFSDataset();
     ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/bar"));
@@ -166,8 +179,97 @@ public class TestDebugAdmin {
 
   @Test(timeout = 60000)
   public void testRecoverLeaseforFileNotFound() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
     assertTrue(runCmd(new String[] {
         "recoverLease", "-path", "/foo", "-retries", "2" }).contains(
         "Giving up on recoverLease for /foo after 1 try"));
   }
+
+  @Test(timeout = 60000)
+  public void testVerifyECCommand() throws Exception {
+    final ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getByID(
+        SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
+    cluster = DFSTestUtil.setupCluster(conf, 6, 5, 0);
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+
+    assertEquals("ret: 1, verifyEC -file <file>  Verify HDFS erasure coding on " +
+        "all block groups of the file.", runCmd(new String[]{"verifyEC"}));
+
+    assertEquals("ret: 1, File /bar does not exist.",
+        runCmd(new String[]{"verifyEC", "-file", "/bar"}));
+
+    fs.create(new Path("/bar")).close();
+    assertEquals("ret: 1, File /bar is not erasure coded.",
+        runCmd(new String[]{"verifyEC", "-file", "/bar"}));
+
+
+    final Path ecDir = new Path("/ec");
+    fs.mkdir(ecDir, FsPermission.getDirDefault());
+    fs.enableErasureCodingPolicy(ecPolicy.getName());
+    fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
+
+    assertEquals("ret: 1, File /ec is not a regular file.",
+        runCmd(new String[]{"verifyEC", "-file", "/ec"}));
+
+    fs.create(new Path(ecDir, "foo"));
+    assertEquals("ret: 1, File /ec/foo is not closed.",
+        runCmd(new String[]{"verifyEC", "-file", "/ec/foo"}));
+
+    final short repl = 1;
+    final long k = 1024;
+    final long m = k * k;
+    final long seed = 0x1234567L;
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_65535"), 65535, repl, seed);
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_65535"})
+        .contains("All EC block group status: OK"));
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_256k"), 256 * k, repl, seed);
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_256k"})
+        .contains("All EC block group status: OK"));
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_1m"), m, repl, seed);
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_1m"})
+        .contains("All EC block group status: OK"));
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_2m"), 2 * m, repl, seed);
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_2m"})
+        .contains("All EC block group status: OK"));
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_3m"), 3 * m, repl, seed);
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_3m"})
+        .contains("All EC block group status: OK"));
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_5m"), 5 * m, repl, seed);
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_5m"})
+        .contains("All EC block group status: OK"));
+    DFSTestUtil.createFile(fs, new Path(ecDir, "foo_6m"), (int) k, 6 * m, m, repl, seed);
+    assertEquals("ret: 0, Checking EC block group: blk_x;Status: OK" +
+            "Checking EC block group: blk_x;Status: OK" +
+            "All EC block group status: OK",
+        runCmd(new String[]{"verifyEC", "-file", "/ec/foo_6m"})
+            .replaceAll("blk_-[0-9]+", "blk_x;"));
+
+    Path corruptFile = new Path(ecDir, "foo_corrupt");
+    DFSTestUtil.createFile(fs, corruptFile, 5841961, repl, seed);
+    List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, corruptFile);
+    assertEquals(1, blocks.size());
+    LocatedStripedBlock blockGroup = (LocatedStripedBlock) blocks.get(0);
+    LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup,
+        ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
+    // Try corrupt block 0 in block group.
+    LocatedBlock toCorruptLocatedBlock = indexedBlocks[0];
+    ExtendedBlock toCorruptBlock = toCorruptLocatedBlock.getBlock();
+    DataNode datanode = cluster.getDataNode(toCorruptLocatedBlock.getLocations()[0].getIpcPort());
+    File blockFile = getBlockFile(datanode.getFSDataset(),
+        toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
+    File metaFile = getMetaFile(datanode.getFSDataset(),
+        toCorruptBlock.getBlockPoolId(), toCorruptBlock.getLocalBlock());
+    // Write error bytes to block file and re-generate meta checksum.
+    byte[] errorBytes = new byte[2097152];
+    new Random(seed).nextBytes(errorBytes);
+    FileUtils.writeByteArrayToFile(blockFile, errorBytes);
+    metaFile.delete();
+    runCmd(new String[]{"computeMeta", "-block", blockFile.getAbsolutePath(),
+        "-out", metaFile.getAbsolutePath()});
+    assertTrue(runCmd(new String[]{"verifyEC", "-file", "/ec/foo_corrupt"})
+        .contains("Status: ERROR, message: EC compute result not match."));
+  }
+
 }