|
@@ -30,6 +30,8 @@ import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
+import java.util.concurrent.BrokenBarrierException;
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
|
|
@@ -53,6 +56,7 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
|
|
|
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
|
|
|
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.test.LambdaTestUtils;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -488,4 +492,64 @@ public class TestReconstructStripedFile {
|
|
|
500, 30000
|
|
|
);
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 180000)
|
|
|
+ public void testErasureCodingWorkerXmitsWeight() throws Exception {
|
|
|
+ testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits());
|
|
|
+ testErasureCodingWorkerXmitsWeight(0f, 1);
|
|
|
+ testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testErasureCodingWorkerXmitsWeight(
|
|
|
+ float weight, int expectedWeight)
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ // Reset cluster with customized xmits weight.
|
|
|
+ conf.setFloat(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY,
|
|
|
+ weight);
|
|
|
+ cluster.shutdown();
|
|
|
+
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ fs = cluster.getFileSystem();
|
|
|
+ fs.enableErasureCodingPolicy(
|
|
|
+ StripedFileTestUtil.getDefaultECPolicy().getName());
|
|
|
+ fs.getClient().setErasureCodingPolicy("/",
|
|
|
+ StripedFileTestUtil.getDefaultECPolicy().getName());
|
|
|
+
|
|
|
+ final int fileLen = cellSize * ecPolicy.getNumDataUnits() * 2;
|
|
|
+ writeFile(fs, "/ec-xmits-weight", fileLen);
|
|
|
+
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
+ int corruptBlocks = dn.getFSDataset().getFinalizedBlocks(
|
|
|
+ cluster.getNameNode().getNamesystem().getBlockPoolId()).size();
|
|
|
+ int expectedXmits = corruptBlocks * expectedWeight;
|
|
|
+
|
|
|
+ final CyclicBarrier barrier = new CyclicBarrier(corruptBlocks + 1);
|
|
|
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
|
|
+ DataNodeFaultInjector delayInjector = new DataNodeFaultInjector() {
|
|
|
+ public void stripedBlockReconstruction() throws IOException {
|
|
|
+ try {
|
|
|
+ barrier.await();
|
|
|
+ } catch (InterruptedException | BrokenBarrierException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ DataNodeFaultInjector.set(delayInjector);
|
|
|
+
|
|
|
+ try {
|
|
|
+ shutdownDataNode(dn);
|
|
|
+ LambdaTestUtils.await(30 * 1000, 500,
|
|
|
+ () -> {
|
|
|
+ int totalXmits = cluster.getDataNodes().stream()
|
|
|
+ .mapToInt(DataNode::getXmitsInProgress).sum();
|
|
|
+ return totalXmits == expectedXmits;
|
|
|
+ }
|
|
|
+ );
|
|
|
+ } finally {
|
|
|
+ barrier.await();
|
|
|
+ DataNodeFaultInjector.set(oldInjector);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|