|
@@ -21,7 +21,9 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.AppendTestUtil;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
@@ -30,19 +32,24 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.After;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.FilenameFilter;
|
|
@@ -50,6 +57,7 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
/**
|
|
|
* This test simulates a variety of situations when blocks are being
|
|
@@ -491,6 +499,84 @@ public class TestBlockReport {
|
|
|
resetConfiguration(); // return the initial state of the configuration
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for the case where one of the DNs in the pipeline is in the
|
|
|
+ * process of doing a block report exactly when the block is closed.
|
|
|
+ * In this case, the block report becomes delayed until after the
|
|
|
+ * block is marked completed on the NN, and hence it reports an RBW
|
|
|
+ * replica for a COMPLETE block. Such a report should not be marked
|
|
|
+ * corrupt.
|
|
|
+ * This is a regression test for HDFS-2791.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
|
|
|
+ final CountDownLatch brFinished = new CountDownLatch(1);
|
|
|
+ DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
|
|
|
+ @Override
|
|
|
+ protected Object passThrough(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
+ try {
|
|
|
+ return super.passThrough(invocation);
|
|
|
+ } finally {
|
|
|
+ // inform the test that our block report went through.
|
|
|
+ brFinished.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
|
+ Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
|
|
+
|
|
|
+ // Start a second DN for this test -- we're checking
|
|
|
+ // what happens when one of the DNs is slowed for some reason.
|
|
|
+ REPL_FACTOR = 2;
|
|
|
+ startDNandWait(null, false);
|
|
|
+
|
|
|
+ NameNode nn = cluster.getNameNode();
|
|
|
+
|
|
|
+ FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
|
|
|
+ try {
|
|
|
+ AppendTestUtil.write(out, 0, 10);
|
|
|
+ out.hflush();
|
|
|
+
|
|
|
+ // Set up a spy so that we can delay the block report coming
|
|
|
+ // from this node.
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
+ DatanodeProtocolClientSideTranslatorPB spy =
|
|
|
+ DataNodeAdapter.spyOnBposToNN(dn, nn);
|
|
|
+
|
|
|
+ Mockito.doAnswer(delayer)
|
|
|
+ .when(spy).blockReport(
|
|
|
+ Mockito.<DatanodeRegistration>anyObject(),
|
|
|
+ Mockito.anyString(),
|
|
|
+ Mockito.<long[]>anyObject());
|
|
|
+
|
|
|
+ // Force a block report to be generated. The block report will have
|
|
|
+ // an RBW replica in it. Wait for the RPC to be sent, but block
|
|
|
+ // it before it gets to the NN.
|
|
|
+ dn.scheduleAllBlockReport(0);
|
|
|
+ delayer.waitForCall();
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now that the stream is closed, the NN will have the block in COMPLETE
|
|
|
+ // state.
|
|
|
+ delayer.proceed();
|
|
|
+ brFinished.await();
|
|
|
+
|
|
|
+ // Verify that no replicas are marked corrupt, and that the
|
|
|
+ // file is still readable.
|
|
|
+ BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
|
|
|
+ assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
|
|
|
+ DFSTestUtil.readFile(fs, filePath);
|
|
|
+
|
|
|
+ // Ensure that the file is readable even from the DN that we futzed with.
|
|
|
+ cluster.stopDataNode(1);
|
|
|
+ DFSTestUtil.readFile(fs, filePath);
|
|
|
+ }
|
|
|
|
|
|
private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
|
|
|
final boolean tooLongWait = false;
|