|
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
|
|
|
import com.google.common.base.Supplier;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
@@ -52,7 +53,9 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
@@ -155,6 +158,67 @@ public class TestReplication {
|
|
|
assertTrue(!fileSys.exists(name));
|
|
|
}
|
|
|
|
|
|
+ private static class CorruptFileSimulatedFSDataset
|
|
|
+ extends SimulatedFSDataset {
|
|
|
+ /**
|
|
|
+ * Simulated input and output streams.
|
|
|
+ *
|
|
|
+ */
|
|
|
+ static private class CorruptFileSimulatedInputStream
|
|
|
+ extends java.io.InputStream {
|
|
|
+ private InputStream inputStream;
|
|
|
+
|
|
|
+ CorruptFileSimulatedInputStream(InputStream is) {
|
|
|
+ inputStream = is;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read() throws IOException {
|
|
|
+ int ret = inputStream.read();
|
|
|
+ if (ret > 0) {
|
|
|
+ throw new IOException("Input/output error");
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read(byte[] b) throws IOException {
|
|
|
+ int ret = inputStream.read(b);
|
|
|
+ if (ret > 0) {
|
|
|
+ throw new IOException("Input/output error");
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage,
|
|
|
+ Configuration conf) {
|
|
|
+ super(storage, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
|
|
|
+ long seekOffset) throws IOException {
|
|
|
+ InputStream result = super.getBlockInputStream(b);
|
|
|
+ IOUtils.skipFully(result, seekOffset);
|
|
|
+ return new CorruptFileSimulatedInputStream(result);
|
|
|
+ }
|
|
|
+
|
|
|
+ static class Factory
|
|
|
+ extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
|
|
|
+ @Override
|
|
|
+ public CorruptFileSimulatedFSDataset newInstance(DataNode datanode,
|
|
|
+ DataStorage storage, Configuration conf) throws IOException {
|
|
|
+ return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isSimulated() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void testBadBlockReportOnTransfer(
|
|
|
boolean corruptBlockByDeletingBlockFile) throws Exception {
|
|
|
Configuration conf = new HdfsConfiguration();
|
|
@@ -206,6 +270,53 @@ public class TestReplication {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
+ public void testBadBlockReportOnTransferCorruptFile() throws Exception {
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+ conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
|
|
|
+ CorruptFileSimulatedFSDataset.Factory.class.getName());
|
|
|
+ // Disable BlockScanner to trigger reportBadBlocks
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L);
|
|
|
+ FileSystem fs;
|
|
|
+ int replicaCount = 0;
|
|
|
+ short replFactor = 1;
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ try {
|
|
|
+ fs = cluster.getFileSystem();
|
|
|
+ final DFSClient dfsClient = new DFSClient(
|
|
|
+ new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
|
|
|
+
|
|
|
+ // Create file with replication factor of 1
|
|
|
+ Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
|
|
|
+ DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
|
|
|
+ DFSTestUtil.waitReplication(fs, file1, replFactor);
|
|
|
+
|
|
|
+ // Increase replication factor, this should invoke transfer request
|
|
|
+ // Receiving datanode fails on checksum and reports it to namenode
|
|
|
+ replFactor = 2;
|
|
|
+ fs.setReplication(file1, replFactor);
|
|
|
+
|
|
|
+ // Now get block details and check if the block is corrupt
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ try {
|
|
|
+ return dfsClient.getNamenode()
|
|
|
+ .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
|
|
|
+ .isCorrupt();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 1000, 15000);
|
|
|
+ replicaCount = dfsClient.getNamenode()
|
|
|
+ .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
|
|
|
+ .getLocations().length;
|
|
|
+ assertEquals("replication should not success", 1, replicaCount);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Test if Datanode reports bad blocks during replication request
|
|
|
*/
|