|
@@ -17,222 +17,58 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs;
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
-import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
|
-import org.apache.hadoop.fs.BlockLocation;
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
|
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
-import org.apache.hadoop.test.GenericTestUtils;
|
|
|
|
-import org.apache.log4j.Level;
|
|
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
-import org.junit.Test;
|
|
|
|
import org.junit.Rule;
|
|
import org.junit.Rule;
|
|
|
|
+import org.junit.Test;
|
|
import org.junit.rules.Timeout;
|
|
import org.junit.rules.Timeout;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
|
-public class TestReadStripedFileWithDecoding {
|
|
|
|
- static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
|
|
|
|
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
|
|
|
|
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
|
|
|
|
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
|
|
|
|
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
|
|
|
|
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
|
|
|
|
|
|
- static {
|
|
|
|
- ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
|
|
|
|
- .getLogger().setLevel(Level.ALL);
|
|
|
|
- GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
|
|
|
|
- GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
|
|
|
|
- GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
|
|
|
|
- }
|
|
|
|
|
|
+public class TestReadStripedFileWithDecoding {
|
|
|
|
+ private static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(TestReadStripedFileWithDecoding.class);
|
|
|
|
|
|
private MiniDFSCluster cluster;
|
|
private MiniDFSCluster cluster;
|
|
- private DistributedFileSystem fs;
|
|
|
|
- private final ErasureCodingPolicy ecPolicy =
|
|
|
|
- StripedFileTestUtil.getDefaultECPolicy();
|
|
|
|
- private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
|
|
|
|
- private final short parityBlocks =
|
|
|
|
- (short) ecPolicy.getNumParityUnits();
|
|
|
|
- private final int numDNs = dataBlocks + parityBlocks;
|
|
|
|
- private final int cellSize = ecPolicy.getCellSize();
|
|
|
|
- private final int stripPerBlock = 4;
|
|
|
|
- private final int blockSize = cellSize * stripPerBlock;
|
|
|
|
- private final int blockGroupSize = blockSize * dataBlocks;
|
|
|
|
- private final int smallFileLength = blockGroupSize - 123;
|
|
|
|
- private final int largeFileLength = blockGroupSize + 123;
|
|
|
|
- private final int[] fileLengths = {smallFileLength, largeFileLength};
|
|
|
|
- private final int[] dnFailureNums = getDnFailureNums();
|
|
|
|
-
|
|
|
|
- private int[] getDnFailureNums() {
|
|
|
|
- int[] dnFailureNums = new int[parityBlocks];
|
|
|
|
- for (int i = 0; i < dnFailureNums.length; i++) {
|
|
|
|
- dnFailureNums[i] = i + 1;
|
|
|
|
- }
|
|
|
|
- return dnFailureNums;
|
|
|
|
- }
|
|
|
|
|
|
+ private DistributedFileSystem dfs;
|
|
|
|
|
|
@Rule
|
|
@Rule
|
|
public Timeout globalTimeout = new Timeout(300000);
|
|
public Timeout globalTimeout = new Timeout(300000);
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setup() throws IOException {
|
|
public void setup() throws IOException {
|
|
- Configuration conf = new HdfsConfiguration();
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
|
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
|
|
|
|
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0);
|
|
|
|
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
|
|
|
|
- false);
|
|
|
|
- conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
|
|
|
|
- StripedFileTestUtil.getDefaultECPolicy().getName());
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
|
|
|
- cluster.getFileSystem().getClient().setErasureCodingPolicy("/",
|
|
|
|
- StripedFileTestUtil.getDefaultECPolicy().getName());
|
|
|
|
- fs = cluster.getFileSystem();
|
|
|
|
|
|
+ cluster = initializeCluster();
|
|
|
|
+ dfs = cluster.getFileSystem();
|
|
}
|
|
}
|
|
|
|
|
|
@After
|
|
@After
|
|
public void tearDown() throws IOException {
|
|
public void tearDown() throws IOException {
|
|
- if (cluster != null) {
|
|
|
|
- cluster.shutdown();
|
|
|
|
- cluster = null;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Shutdown tolerable number of Datanode before reading.
|
|
|
|
- * Verify the decoding works correctly.
|
|
|
|
- */
|
|
|
|
- @Test(timeout=300000)
|
|
|
|
- public void testReadWithDNFailure() throws Exception {
|
|
|
|
- for (int fileLength : fileLengths) {
|
|
|
|
- for (int dnFailureNum : dnFailureNums) {
|
|
|
|
- try {
|
|
|
|
- // setup a new cluster with no dead datanode
|
|
|
|
- setup();
|
|
|
|
- testReadWithDNFailure(fileLength, dnFailureNum);
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- String fileType = fileLength < (blockSize * dataBlocks) ?
|
|
|
|
- "smallFile" : "largeFile";
|
|
|
|
- LOG.error("Failed to read file with DN failure:"
|
|
|
|
- + " fileType = "+ fileType
|
|
|
|
- + ", dnFailureNum = " + dnFailureNum);
|
|
|
|
- } finally {
|
|
|
|
- // tear down the cluster
|
|
|
|
- tearDown();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Corrupt tolerable number of block before reading.
|
|
|
|
- * Verify the decoding works correctly.
|
|
|
|
- */
|
|
|
|
- @Test(timeout=300000)
|
|
|
|
- public void testReadCorruptedData() throws IOException {
|
|
|
|
- for (int fileLength : fileLengths) {
|
|
|
|
- for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) {
|
|
|
|
- for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks;
|
|
|
|
- parityDelNum++) {
|
|
|
|
- String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
|
|
|
|
- testReadWithBlockCorrupted(src, fileLength,
|
|
|
|
- dataDelNum, parityDelNum, false);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Delete tolerable number of block before reading.
|
|
|
|
- * Verify the decoding works correctly.
|
|
|
|
- */
|
|
|
|
- @Test(timeout=300000)
|
|
|
|
- public void testReadCorruptedDataByDeleting() throws IOException {
|
|
|
|
- for (int fileLength : fileLengths) {
|
|
|
|
- for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) {
|
|
|
|
- for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks;
|
|
|
|
- parityDelNum++) {
|
|
|
|
- String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
|
|
|
|
- testReadWithBlockCorrupted(src, fileLength,
|
|
|
|
- dataDelNum, parityDelNum, true);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private int findFirstDataNode(Path file, long length) throws IOException {
|
|
|
|
- BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
|
|
|
|
- String name = (locs[0].getNames())[0];
|
|
|
|
- int dnIndex = 0;
|
|
|
|
- for (DataNode dn : cluster.getDataNodes()) {
|
|
|
|
- int port = dn.getXferPort();
|
|
|
|
- if (name.contains(Integer.toString(port))) {
|
|
|
|
- return dnIndex;
|
|
|
|
- }
|
|
|
|
- dnIndex++;
|
|
|
|
- }
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void verifyRead(Path testPath, int length, byte[] expected)
|
|
|
|
- throws IOException {
|
|
|
|
- byte[] buffer = new byte[length + 100];
|
|
|
|
- StripedFileTestUtil.verifyLength(fs, testPath, length);
|
|
|
|
- StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
|
|
|
|
- StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
|
|
|
|
- StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
|
|
|
|
- ByteBuffer.allocate(length + 100));
|
|
|
|
- StripedFileTestUtil.verifySeek(fs, testPath, length, ecPolicy,
|
|
|
|
- blockGroupSize);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void testReadWithDNFailure(int fileLength, int dnFailureNum)
|
|
|
|
- throws Exception {
|
|
|
|
- String fileType = fileLength < (blockSize * dataBlocks) ?
|
|
|
|
- "smallFile" : "largeFile";
|
|
|
|
- String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
|
|
|
|
- LOG.info("testReadWithDNFailure: file = " + src
|
|
|
|
- + ", fileSize = " + fileLength
|
|
|
|
- + ", dnFailureNum = " + dnFailureNum);
|
|
|
|
-
|
|
|
|
- Path testPath = new Path(src);
|
|
|
|
- final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
|
|
|
|
- DFSTestUtil.writeFile(fs, testPath, bytes);
|
|
|
|
- StripedFileTestUtil.waitBlockGroupsReported(fs, src);
|
|
|
|
-
|
|
|
|
- // shut down the DN that holds an internal data block
|
|
|
|
- BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
|
|
|
|
- cellSize);
|
|
|
|
- for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
|
|
|
|
- String name = (locs[0].getNames())[failedDnIdx];
|
|
|
|
- for (DataNode dn : cluster.getDataNodes()) {
|
|
|
|
- int port = dn.getXferPort();
|
|
|
|
- if (name.contains(Integer.toString(port))) {
|
|
|
|
- dn.shutdown();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // check file length, pread, stateful read and seek
|
|
|
|
- verifyRead(testPath, fileLength, bytes);
|
|
|
|
|
|
+ tearDownCluster(cluster);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -245,15 +81,17 @@ public class TestReadStripedFileWithDecoding {
|
|
final Path file = new Path("/corrupted");
|
|
final Path file = new Path("/corrupted");
|
|
final int length = 10; // length of "corruption"
|
|
final int length = 10; // length of "corruption"
|
|
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
|
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
|
- DFSTestUtil.writeFile(fs, file, bytes);
|
|
|
|
|
|
+ DFSTestUtil.writeFile(dfs, file, bytes);
|
|
|
|
|
|
// corrupt the first data block
|
|
// corrupt the first data block
|
|
- int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
|
|
|
|
|
|
+ int dnIndex = ReadStripedFileWithDecodingHelper.findFirstDataNode(
|
|
|
|
+ cluster, dfs, file, CELL_SIZE * NUM_DATA_UNITS);
|
|
Assert.assertNotEquals(-1, dnIndex);
|
|
Assert.assertNotEquals(-1, dnIndex);
|
|
- LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
|
|
|
|
- .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
|
|
|
|
|
|
+ LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
|
|
|
|
+ .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
|
|
|
|
+ .get(0);
|
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
|
- cellSize, dataBlocks, parityBlocks);
|
|
|
|
|
|
+ CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
|
|
// find the first block file
|
|
// find the first block file
|
|
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
|
|
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
|
|
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
|
|
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
|
|
@@ -272,7 +110,7 @@ public class TestReadStripedFileWithDecoding {
|
|
|
|
|
|
try {
|
|
try {
|
|
// do stateful read
|
|
// do stateful read
|
|
- StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
|
|
|
|
|
|
+ StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
|
|
ByteBuffer.allocate(1024));
|
|
ByteBuffer.allocate(1024));
|
|
|
|
|
|
// check whether the corruption has been reported to the NameNode
|
|
// check whether the corruption has been reported to the NameNode
|
|
@@ -293,110 +131,35 @@ public class TestReadStripedFileWithDecoding {
|
|
final Path file = new Path("/invalidate");
|
|
final Path file = new Path("/invalidate");
|
|
final int length = 10;
|
|
final int length = 10;
|
|
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
|
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
|
- DFSTestUtil.writeFile(fs, file, bytes);
|
|
|
|
|
|
+ DFSTestUtil.writeFile(dfs, file, bytes);
|
|
|
|
|
|
- int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
|
|
|
|
|
|
+ int dnIndex = findFirstDataNode(cluster, dfs, file,
|
|
|
|
+ CELL_SIZE * NUM_DATA_UNITS);
|
|
Assert.assertNotEquals(-1, dnIndex);
|
|
Assert.assertNotEquals(-1, dnIndex);
|
|
- LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
|
|
|
|
- .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
|
|
|
|
|
|
+ LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
|
|
|
|
+ .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
|
|
|
|
+ .get(0);
|
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
|
- cellSize, dataBlocks, parityBlocks);
|
|
|
|
|
|
+ CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
|
|
final Block b = blks[0].getBlock().getLocalBlock();
|
|
final Block b = blks[0].getBlock().getLocalBlock();
|
|
|
|
|
|
DataNode dn = cluster.getDataNodes().get(dnIndex);
|
|
DataNode dn = cluster.getDataNodes().get(dnIndex);
|
|
- // disable the heartbeat from DN so that the invalidated block record is kept
|
|
|
|
- // in NameNode until heartbeat expires and NN mark the dn as dead
|
|
|
|
|
|
+ // disable the heartbeat from DN so that the invalidated block record is
|
|
|
|
+ // kept in NameNode until heartbeat expires and NN mark the dn as dead
|
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
|
|
|
|
|
try {
|
|
try {
|
|
// delete the file
|
|
// delete the file
|
|
- fs.delete(file, true);
|
|
|
|
|
|
+ dfs.delete(file, true);
|
|
// check the block is added to invalidateBlocks
|
|
// check the block is added to invalidateBlocks
|
|
final FSNamesystem fsn = cluster.getNamesystem();
|
|
final FSNamesystem fsn = cluster.getNamesystem();
|
|
final BlockManager bm = fsn.getBlockManager();
|
|
final BlockManager bm = fsn.getBlockManager();
|
|
- DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
|
|
|
|
|
+ DatanodeDescriptor dnd =
|
|
|
|
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
|
|
Assert.assertTrue(bm.containsInvalidateBlock(
|
|
Assert.assertTrue(bm.containsInvalidateBlock(
|
|
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
|
|
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
|
|
} finally {
|
|
} finally {
|
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Test reading a file with some blocks(data blocks or parity blocks or both)
|
|
|
|
- * deleted or corrupted.
|
|
|
|
- * @param src file path
|
|
|
|
- * @param fileLength file length
|
|
|
|
- * @param dataBlkDelNum the deleted or corrupted number of data blocks.
|
|
|
|
- * @param parityBlkDelNum the deleted or corrupted number of parity blocks.
|
|
|
|
- * @param deleteBlockFile whether block file is deleted or corrupted.
|
|
|
|
- * true is to delete the block file.
|
|
|
|
- * false is to corrupt the content of the block file.
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- private void testReadWithBlockCorrupted(String src, int fileLength,
|
|
|
|
- int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
|
|
|
|
- throws IOException {
|
|
|
|
- LOG.info("testReadWithBlockCorrupted: file = " + src
|
|
|
|
- + ", dataBlkDelNum = " + dataBlkDelNum
|
|
|
|
- + ", parityBlkDelNum = " + parityBlkDelNum
|
|
|
|
- + ", deleteBlockFile? " + deleteBlockFile);
|
|
|
|
- int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
|
|
|
|
- Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
|
|
|
|
- dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
|
|
|
|
- Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
|
|
|
|
- "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
|
|
|
|
-
|
|
|
|
- // write a file with the length of writeLen
|
|
|
|
- Path srcPath = new Path(src);
|
|
|
|
- final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
|
|
|
|
- DFSTestUtil.writeFile(fs, srcPath, bytes);
|
|
|
|
-
|
|
|
|
- // delete or corrupt some blocks
|
|
|
|
- corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
|
|
|
|
-
|
|
|
|
- // check the file can be read after some blocks were deleted
|
|
|
|
- verifyRead(srcPath, fileLength, bytes);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void corruptBlocks(Path srcPath, int dataBlkDelNum,
|
|
|
|
- int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
|
|
|
|
- int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
|
|
|
|
-
|
|
|
|
- LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
|
|
|
|
- LocatedStripedBlock lastBlock =
|
|
|
|
- (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
|
|
|
|
-
|
|
|
|
- int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
|
|
|
|
- dataBlkDelNum);
|
|
|
|
- Assert.assertNotNull(delDataBlkIndices);
|
|
|
|
- int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
|
|
|
|
- dataBlocks + parityBlocks, parityBlkDelNum);
|
|
|
|
- Assert.assertNotNull(delParityBlkIndices);
|
|
|
|
-
|
|
|
|
- int[] delBlkIndices = new int[recoverBlkNum];
|
|
|
|
- System.arraycopy(delDataBlkIndices, 0,
|
|
|
|
- delBlkIndices, 0, delDataBlkIndices.length);
|
|
|
|
- System.arraycopy(delParityBlkIndices, 0,
|
|
|
|
- delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
|
|
|
|
-
|
|
|
|
- ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
|
|
|
|
- for (int i = 0; i < recoverBlkNum; i++) {
|
|
|
|
- delBlocks[i] = StripedBlockUtil
|
|
|
|
- .constructInternalBlock(lastBlock.getBlock(),
|
|
|
|
- cellSize, dataBlocks, delBlkIndices[i]);
|
|
|
|
- if (deleteBlockFile) {
|
|
|
|
- // delete the block file
|
|
|
|
- cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
|
|
|
|
- } else {
|
|
|
|
- // corrupt the block file
|
|
|
|
- cluster.corruptBlockOnDataNodes(delBlocks[i]);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
|
|
|
|
- return fs.getClient().getLocatedBlocks(filePath.toString(),
|
|
|
|
- 0, Long.MAX_VALUE);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|