|
@@ -24,10 +24,12 @@ import static org.junit.Assert.fail;
|
|
|
import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
@@ -44,10 +46,16 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
@@ -61,6 +69,8 @@ public class TestFileAppend{
|
|
|
|
|
|
private static byte[] fileContents = null;
|
|
|
|
|
|
+ static final DataChecksum DEFAULT_CHECKSUM =
|
|
|
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
|
|
|
//
|
|
|
// writes to file but does not close it
|
|
|
//
|
|
@@ -656,4 +666,63 @@ public class TestFileAppend{
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 10000)
|
|
|
+ public void testConcurrentAppendRead()
|
|
|
+ throws IOException, TimeoutException, InterruptedException {
|
|
|
+ // Create a finalized replica and append to it
|
|
|
+ // Read block data and checksum. Verify checksum.
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
|
|
+ conf.setInt("dfs.min.replication", 1);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
+ FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
|
|
|
+
|
|
|
+ // create a file with 1 byte of data.
|
|
|
+ long initialFileLength = 1;
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ Path fileName = new Path("/appendCorruptBlock");
|
|
|
+ DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0);
|
|
|
+ DFSTestUtil.waitReplication(fs, fileName, (short) 1);
|
|
|
+ Assert.assertTrue("File not created", fs.exists(fileName));
|
|
|
+
|
|
|
+ // Call FsDatasetImpl#append to append the block file,
|
|
|
+ // which converts it to a rbw replica.
|
|
|
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
|
|
|
+ long newGS = block.getGenerationStamp() + 1;
|
|
|
+ ReplicaHandler replicaHandler =
|
|
|
+ dataSet.append(block, newGS, initialFileLength);
|
|
|
+
|
|
|
+ // write data to block file
|
|
|
+ ReplicaBeingWritten rbw =
|
|
|
+ (ReplicaBeingWritten) replicaHandler.getReplica();
|
|
|
+ ReplicaOutputStreams outputStreams =
|
|
|
+ rbw.createStreams(false, DEFAULT_CHECKSUM);
|
|
|
+ OutputStream dataOutput = outputStreams.getDataOut();
|
|
|
+
|
|
|
+ byte[] appendBytes = new byte[1];
|
|
|
+ dataOutput.write(appendBytes, 0, 1);
|
|
|
+ dataOutput.flush();
|
|
|
+ dataOutput.close();
|
|
|
+
|
|
|
+ // update checksum file
|
|
|
+ final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
|
|
|
+ FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(),
|
|
|
+ rbw.getBlockFile(), smallBufferSize, conf);
|
|
|
+
|
|
|
+ // read the block
|
|
|
+ // the DataNode BlockSender should read from the rbw replica's in-memory
|
|
|
+ // checksum, rather than on-disk checksum. Otherwise it will see a
|
|
|
+ // checksum mismatch error.
|
|
|
+ final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName);
|
|
|
+ assertEquals("should have read only one byte!", 1, readBlock.length);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|