|
@@ -23,6 +23,7 @@ import static org.junit.Assume.assumeTrue;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.BitSet;
|
|
@@ -34,6 +35,12 @@ import java.util.Random;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
|
|
|
+import org.apache.hadoop.io.ElasticByteBufferPool;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -575,4 +582,237 @@ public class TestReconstructStripedFile {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * When the StripedBlockReader timeout, the outdated future should be ignored.
|
|
|
+ * Or the NPE will be thrown, which will stop reading the remaining data, and
|
|
|
+ * the reconstruction task will fail.
|
|
|
+ */
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testTimeoutReadBlockInReconstruction() throws Exception {
|
|
|
+ assumeTrue("Ignore case where num parity units <= 1",
|
|
|
+ ecPolicy.getNumParityUnits() > 1);
|
|
|
+ int stripedBufferSize = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
|
|
|
+ cellSize);
|
|
|
+ ErasureCodingPolicy policy = ecPolicy;
|
|
|
+ fs.enableErasureCodingPolicy(policy.getName());
|
|
|
+ fs.getClient().setErasureCodingPolicy("/", policy.getName());
|
|
|
+
|
|
|
+ // StripedBlockReconstructor#reconstruct will loop 2 times
|
|
|
+ final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
|
|
|
+ String fileName = "/timeout-read-block";
|
|
|
+ Path file = new Path(fileName);
|
|
|
+ writeFile(fs, fileName, fileLen);
|
|
|
+ fs.getFileBlockLocations(file, 0, fileLen);
|
|
|
+
|
|
|
+ LocatedBlocks locatedBlocks =
|
|
|
+ StripedFileTestUtil.getLocatedBlocks(file, fs);
|
|
|
+ Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
|
|
|
+ // The file only has one block group
|
|
|
+ LocatedBlock lblock = locatedBlocks.get(0);
|
|
|
+ DatanodeInfo[] datanodeinfos = lblock.getLocations();
|
|
|
+
|
|
|
+ // to reconstruct first block
|
|
|
+ DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
|
|
|
+
|
|
|
+ int stripedReadTimeoutInMills = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
|
|
|
+ Assert.assertTrue(
|
|
|
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
|
|
|
+ + " must be greater than 2000",
|
|
|
+ stripedReadTimeoutInMills > 2000);
|
|
|
+
|
|
|
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
|
|
+ DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
|
|
|
+ private AtomicInteger numDelayReader = new AtomicInteger(0);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void delayBlockReader() {
|
|
|
+ int index = numDelayReader.incrementAndGet();
|
|
|
+ LOG.info("Delay the {}th read block", index);
|
|
|
+
|
|
|
+ // the file's first StripedBlockReconstructor#reconstruct,
|
|
|
+ // and the first reader will timeout
|
|
|
+ if (index == 1) {
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(() -> numDelayReader.get() >=
|
|
|
+ ecPolicy.getNumDataUnits() + 1, 50,
|
|
|
+ stripedReadTimeoutInMills * 3
|
|
|
+ );
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ Assert.fail("Can't reconstruct the file's first part.");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // stop all the following re-reconstruction tasks
|
|
|
+ if (index > 3 * ecPolicy.getNumDataUnits() + 1) {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ DataNodeFaultInjector.set(timeoutInjector);
|
|
|
+
|
|
|
+ try {
|
|
|
+ shutdownDataNode(dataNode);
|
|
|
+ // before HDFS-15240, NPE will cause reconstruction fail(test timeout)
|
|
|
+ StripedFileTestUtil
|
|
|
+ .waitForReconstructionFinished(file, fs, groupSize);
|
|
|
+ } finally {
|
|
|
+ DataNodeFaultInjector.set(oldInjector);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * When block reader timeout, the outdated future should be ignored.
|
|
|
+ * Or the ByteBuffer would be wrote after giving back to the BufferPool.
|
|
|
+ * This UT is used to ensure that we should close block reader
|
|
|
+ * before freeing the buffer.
|
|
|
+ */
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
|
|
|
+ assumeTrue("Ignore case where num parity units <= 1",
|
|
|
+ ecPolicy.getNumParityUnits() > 1);
|
|
|
+ int stripedBufferSize = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
|
|
|
+ cellSize);
|
|
|
+ // StripedBlockReconstructor#reconstruct will loop 2 times
|
|
|
+ final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
|
|
|
+ String fileName = "/no-dirty-buffer";
|
|
|
+ Path file = new Path(fileName);
|
|
|
+ writeFile(fs, fileName, fileLen);
|
|
|
+ fs.getFileBlockLocations(file, 0, fileLen);
|
|
|
+
|
|
|
+ LocatedBlocks locatedBlocks =
|
|
|
+ StripedFileTestUtil.getLocatedBlocks(file, fs);
|
|
|
+ Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
|
|
|
+ // The file only has one block group
|
|
|
+ LocatedBlock lblock = locatedBlocks.get(0);
|
|
|
+ DatanodeInfo[] datanodeinfos = lblock.getLocations();
|
|
|
+
|
|
|
+ // to reconstruct first block
|
|
|
+ DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
|
|
|
+
|
|
|
+ int stripedReadTimeoutInMills = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
|
|
|
+ DFSConfigKeys.
|
|
|
+ DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
|
|
|
+ Assert.assertTrue(
|
|
|
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
|
|
|
+ + " must be greater than 2000",
|
|
|
+ stripedReadTimeoutInMills > 2000);
|
|
|
+
|
|
|
+ ElasticByteBufferPool bufferPool =
|
|
|
+ (ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
|
|
|
+ emptyBufferPool(bufferPool, true);
|
|
|
+ emptyBufferPool(bufferPool, false);
|
|
|
+
|
|
|
+ AtomicInteger finishedReadBlock = new AtomicInteger(0);
|
|
|
+
|
|
|
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
|
|
+ DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
|
|
|
+ private AtomicInteger numDelayReader = new AtomicInteger(0);
|
|
|
+ private AtomicBoolean continueRead = new AtomicBoolean(false);
|
|
|
+ private AtomicBoolean closeByNPE = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void delayBlockReader() {
|
|
|
+ int index = numDelayReader.incrementAndGet();
|
|
|
+ LOG.info("Delay the {}th read block", index);
|
|
|
+
|
|
|
+ // the file's first StripedBlockReconstructor#reconstruct,
|
|
|
+ // and the first reader will timeout
|
|
|
+ if (index == 1) {
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(() -> numDelayReader.get() >=
|
|
|
+ ecPolicy.getNumDataUnits() + 1, 50,
|
|
|
+ stripedReadTimeoutInMills * 3
|
|
|
+ );
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ Assert.fail("Can't reconstruct the file's first part.");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (index > ecPolicy.getNumDataUnits() + 1) {
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(
|
|
|
+ () -> {
|
|
|
+ LOG.info("Close by NPE: {}, continue read: {}",
|
|
|
+ closeByNPE, continueRead);
|
|
|
+ return closeByNPE.get() ? continueRead.get()
|
|
|
+ : index == finishedReadBlock.get() + 1; }, 5,
|
|
|
+ stripedReadTimeoutInMills * 3
|
|
|
+ );
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ Assert.fail("Can't reconstruct the file's remaining part.");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void interceptBlockReader() {
|
|
|
+ int n = finishedReadBlock.incrementAndGet();
|
|
|
+ LOG.info("Intercept the end of {}th read block.", n);
|
|
|
+ }
|
|
|
+
|
|
|
+ private AtomicInteger numFreeBuffer = new AtomicInteger(0);
|
|
|
+ @Override
|
|
|
+ public void interceptFreeBlockReaderBuffer() {
|
|
|
+ closeByNPE.compareAndSet(false, true);
|
|
|
+ int num = numFreeBuffer.incrementAndGet();
|
|
|
+ LOG.info("Intercept the {} free block buffer.", num);
|
|
|
+ if (num >= ecPolicy.getNumDataUnits() + 1) {
|
|
|
+ continueRead.compareAndSet(false, true);
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
|
|
|
+ 2 * ecPolicy.getNumDataUnits() + 1, 50,
|
|
|
+ stripedReadTimeoutInMills * 3
|
|
|
+ );
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ Assert.fail("Can't finish the file's reconstruction.");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ DataNodeFaultInjector.set(timeoutInjector);
|
|
|
+ try {
|
|
|
+ shutdownDataNode(dataNode);
|
|
|
+ // at least one timeout reader
|
|
|
+ GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
|
|
|
+ 2 * ecPolicy.getNumDataUnits() + 1, 50,
|
|
|
+ stripedReadTimeoutInMills * 3
|
|
|
+ );
|
|
|
+
|
|
|
+ assertBufferPoolIsEmpty(bufferPool, false);
|
|
|
+ assertBufferPoolIsEmpty(bufferPool, true);
|
|
|
+ StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
|
|
|
+ } finally {
|
|
|
+ DataNodeFaultInjector.set(oldInjector);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
|
|
|
+ boolean direct) {
|
|
|
+ while (bufferPool.size(direct) != 0) {
|
|
|
+ // iterate all ByteBuffers in ElasticByteBufferPool
|
|
|
+ ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
|
|
|
+ Assert.assertEquals(0, byteBuffer.position());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void emptyBufferPool(ElasticByteBufferPool bufferPool,
|
|
|
+ boolean direct) {
|
|
|
+ while (bufferPool.size(direct) != 0) {
|
|
|
+ bufferPool.getBuffer(direct, 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|