Bläddra i källkod

HDFS-13439. Add test case for read block operation when it is moved. Contributed by Ajay Kumar.

Arpit Agarwal 7 år sedan
förälder
incheckning
d907fdc3cd

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -970,7 +970,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @return newReplicaInfo
    * @throws IOException
    */
-  private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
+  @VisibleForTesting
+  ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
       FsVolumeReference volumeRef) throws IOException {
     ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
         volumeRef);

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java

@@ -87,6 +87,11 @@ public class BlockReaderTestUtil {
     this(replicationFactor, new HdfsConfiguration());
   }
 
+  public BlockReaderTestUtil(MiniDFSCluster cluster, HdfsConfiguration conf) {
+    this.conf = conf;
+    this.cluster = cluster;
+  }
+
   public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception {
     this.conf = config;
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);

+ 104 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -20,17 +20,23 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -83,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOUR
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
@@ -832,8 +840,21 @@ public class TestFsDatasetImpl {
   private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl
       fsDataSetImpl) throws IOException {
     ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
-    FsVolumeSpi destVolume = null;
+    FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
+    return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo,
+        destVolume.obtainReference());
+  }
 
+  /**
+   * Finds a new destination volume for block.
+   *
+   * @param block         - Extended Block
+   * @param fsDataSetImpl - FsDatasetImpl reference
+   * @throws IOException
+   */
+  private FsVolumeSpi getDestinationVolume(ExtendedBlock block, FsDatasetImpl
+      fsDataSetImpl) throws IOException {
+    FsVolumeSpi destVolume = null;
     final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID();
     try (FsVolumeReferences volumeReferences =
         fsDataSetImpl.getFsVolumeReferences()) {
@@ -844,8 +865,88 @@ public class TestFsDatasetImpl {
         }
       }
     }
-    return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo,
-        destVolume.obtainReference());
+    return destVolume;
+  }
+
+  @Test(timeout = 3000000)
+  public void testBlockReadOpWhileMovingBlock() throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+
+      // Setup cluster
+      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+      cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(1)
+          .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
+          .storagesPerDatanode(2)
+          .build();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+
+      // Create test file with ASCII data
+      Path filePath = new Path("/tmp/testData");
+      String blockData = RandomStringUtils.randomAscii(512 * 4);
+      FSDataOutputStream fout = fs.create(filePath);
+      fout.writeBytes(blockData);
+      fout.close();
+      assertEquals(blockData, DFSTestUtil.readFile(fs, filePath));
+
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+      BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new
+          HdfsConfiguration(conf));
+      LocatedBlock blk = util.getFileBlocks(filePath, 512 * 2).get(0);
+      File[] blkFiles = cluster.getAllBlockFiles(block);
+
+      // Part 1: Read partial data from block
+      LOG.info("Reading partial data for block {} before moving it: ",
+          blk.getBlock().toString());
+      BlockReader blkReader = BlockReaderTestUtil.getBlockReader(
+          (DistributedFileSystem) fs, blk, 0, 512 * 2);
+      byte[] buf = new byte[512 * 2];
+      blkReader.read(buf, 0, 512);
+      assertEquals(blockData.substring(0, 512), new String(buf,
+          StandardCharsets.US_ASCII).substring(0, 512));
+
+      // Part 2: Move block and than read remaining block
+      FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
+      ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
+      FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
+      assertNotNull("Destination volume should not be null.", destVolume);
+      fsDataSetImpl.moveBlock(block, replicaInfo, destVolume.obtainReference());
+      // Trigger block report to update block info in NN
+      cluster.triggerBlockReports();
+      blkReader.read(buf, 512, 512);
+      assertEquals(blockData.substring(0, 512 * 2), new String(buf,
+          StandardCharsets.US_ASCII).substring(0, 512 * 2));
+      blkReader = BlockReaderTestUtil.getBlockReader(
+          (DistributedFileSystem) fs,
+          blk, 0, blockData.length());
+      buf = new byte[512 * 4];
+      blkReader.read(buf, 0, 512 * 4);
+      assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII));
+
+      // Part 3: 1. Close the block reader
+      // 2. Assert source block doesn't exist on initial volume
+      // 3. Assert new file location for block is different
+      // 4. Confirm client can read data from new location
+      blkReader.close();
+      ExtendedBlock block2 = DFSTestUtil.getFirstBlock(fs, filePath);
+      File[] blkFiles2 = cluster.getAllBlockFiles(block2);
+      blk = util.getFileBlocks(filePath, 512 * 4).get(0);
+      blkReader = BlockReaderTestUtil.getBlockReader(
+          (DistributedFileSystem) fs,
+          blk, 0, blockData.length());
+      blkReader.read(buf, 0, 512 * 4);
+
+      assertFalse(Files.exists(Paths.get(blkFiles[0].getAbsolutePath())));
+      assertNotEquals(blkFiles[0], blkFiles2[0]);
+      assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII));
+
+    } finally {
+      if (cluster.isClusterUp()) {
+        cluster.shutdown();
+      }
+    }
   }
 
 }