|
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
|
|
+import org.apache.hadoop.io.erasurecode.ECSchema;
|
|
|
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
|
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
@@ -77,30 +79,29 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
.getLogger().setLevel(Level.ALL);
|
|
|
}
|
|
|
|
|
|
+ private final int cellSize = 64 * 1024; //64k
|
|
|
+ private final int stripesPerBlock = 4;
|
|
|
private ErasureCodingPolicy ecPolicy;
|
|
|
private int dataBlocks;
|
|
|
private int parityBlocks;
|
|
|
- private int cellSize;
|
|
|
- private final int stripesPerBlock = 4;
|
|
|
private int blockSize;
|
|
|
private int blockGroupSize;
|
|
|
|
|
|
private static final int FLUSH_POS =
|
|
|
9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
|
|
|
|
|
|
- public ErasureCodingPolicy getEcPolicy() {
|
|
|
- return StripedFileTestUtil.getDefaultECPolicy();
|
|
|
+ public ECSchema getEcSchema() {
|
|
|
+ return StripedFileTestUtil.getDefaultECPolicy().getSchema();
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Initialize erasure coding policy.
|
|
|
*/
|
|
|
@Before
|
|
|
- public void init(){
|
|
|
- ecPolicy = getEcPolicy();
|
|
|
+ public void init() {
|
|
|
+ ecPolicy = new ErasureCodingPolicy(getEcSchema(), cellSize);
|
|
|
dataBlocks = ecPolicy.getNumDataUnits();
|
|
|
parityBlocks = ecPolicy.getNumParityUnits();
|
|
|
- cellSize = ecPolicy.getCellSize();
|
|
|
blockSize = cellSize * stripesPerBlock;
|
|
|
blockGroupSize = blockSize * dataBlocks;
|
|
|
dnIndexSuite = getDnIndexSuite();
|
|
@@ -189,7 +190,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
private List<Integer> lengths;
|
|
|
|
|
|
Integer getLength(int i) {
|
|
|
- return i >= 0 && i < lengths.size()? lengths.get(i): null;
|
|
|
+ return i >= 0 && i < lengths.size() ? lengths.get(i): null;
|
|
|
}
|
|
|
|
|
|
private static final Random RANDOM = new Random();
|
|
@@ -220,6 +221,10 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
|
|
cluster.waitActive();
|
|
|
dfs = cluster.getFileSystem();
|
|
|
+ AddErasureCodingPolicyResponse[] res =
|
|
|
+ dfs.addErasureCodingPolicies(new ErasureCodingPolicy[]{ecPolicy});
|
|
|
+ ecPolicy = res[0].getPolicy();
|
|
|
+ dfs.enableErasureCodingPolicy(ecPolicy.getName());
|
|
|
DFSTestUtil.enableAllECPolicies(dfs);
|
|
|
dfs.mkdirs(dir);
|
|
|
dfs.setErasureCodingPolicy(dir, ecPolicy.getName());
|
|
@@ -241,7 +246,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
return conf;
|
|
|
}
|
|
|
|
|
|
- @Test(timeout=240000)
|
|
|
+ @Test(timeout=300000)
|
|
|
public void testMultipleDatanodeFailure56() throws Exception {
|
|
|
runTestWithMultipleFailure(getLength(56));
|
|
|
}
|
|
@@ -260,7 +265,8 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
|
|
|
@Test(timeout=240000)
|
|
|
public void testBlockTokenExpired() throws Exception {
|
|
|
- final int length = dataBlocks * (blockSize - cellSize);
|
|
|
+ // Make sure killPos is greater than the length of one stripe
|
|
|
+ final int length = dataBlocks * cellSize * 3;
|
|
|
final HdfsConfiguration conf = newHdfsConfiguration();
|
|
|
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
|
@@ -300,13 +306,13 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
cluster.triggerHeartbeats();
|
|
|
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
|
|
DatanodeReportType.LIVE);
|
|
|
- assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
|
|
+ assertEquals("Mismatches number of live Dns", numDatanodes, info.length);
|
|
|
final Path dirFile = new Path(dir, "ecfile");
|
|
|
LambdaTestUtils.intercept(
|
|
|
IOException.class,
|
|
|
"File " + dirFile + " could only be written to " +
|
|
|
numDatanodes + " of the " + dataBlocks + " required nodes for " +
|
|
|
- getEcPolicy().getName(),
|
|
|
+ ecPolicy.getName(),
|
|
|
() -> {
|
|
|
try (FSDataOutputStream out = dfs.create(dirFile, true)) {
|
|
|
out.write("something".getBytes());
|
|
@@ -413,7 +419,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
cluster.triggerHeartbeats();
|
|
|
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
|
|
DatanodeReportType.LIVE);
|
|
|
- assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
|
|
+ assertEquals("Mismatches number of live Dns", numDatanodes, info.length);
|
|
|
Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
|
|
|
int fileLength = cellSize - 1000;
|
|
|
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
|
|
@@ -432,7 +438,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
try {
|
|
|
LOG.info("runTest: dn=" + dn + ", length=" + length);
|
|
|
setup(conf);
|
|
|
- runTest(length, new int[]{length/2}, new int[]{dn}, false);
|
|
|
+ runTest(length, new int[]{length / 2}, new int[]{dn}, false);
|
|
|
} catch (Throwable e) {
|
|
|
final String err = "failed, dn=" + dn + ", length=" + length
|
|
|
+ StringUtils.stringifyException(e);
|
|
@@ -582,10 +588,10 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
long oldGS = -1; // the old GS before bumping
|
|
|
List<Long> gsList = new ArrayList<>();
|
|
|
final List<DatanodeInfo> killedDN = new ArrayList<>();
|
|
|
- int numKilled=0;
|
|
|
+ int numKilled = 0;
|
|
|
for(; pos.get() < length;) {
|
|
|
final int i = pos.getAndIncrement();
|
|
|
- if (numKilled < killPos.length && i == killPos[numKilled]) {
|
|
|
+ if (numKilled < killPos.length && i == killPos[numKilled]) {
|
|
|
assertTrue(firstGS != -1);
|
|
|
final long gs = getGenerationStamp(stripedOut);
|
|
|
if (numKilled == 0) {
|
|
@@ -706,8 +712,6 @@ public class TestDFSStripedOutputStreamWithFailure {
|
|
|
|
|
|
private void run(int offset) {
|
|
|
int base = getBase();
|
|
|
- // TODO: Fix and re-enable these flaky tests. See HDFS-12417.
|
|
|
- assumeTrue("Test has been temporarily disabled. See HDFS-12417.", false);
|
|
|
assumeTrue(base >= 0);
|
|
|
final int i = offset + base;
|
|
|
final Integer length = getLength(i);
|