|
@@ -17,36 +17,46 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.BlockType;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -351,7 +361,13 @@ public class TestLeaseRecovery {
|
|
|
String file = "/test/f1";
|
|
|
Path filePath = new Path(file);
|
|
|
|
|
|
- createCommittedNotCompleteFile(client, file);
|
|
|
+ createCommittedNotCompleteFile(client, file, null, 1);
|
|
|
+
|
|
|
+ INodeFile inode = cluster.getNamesystem().getFSDirectory()
|
|
|
+ .getINode(filePath.toString()).asFile();
|
|
|
+ assertTrue(inode.isUnderConstruction());
|
|
|
+ assertEquals(1, inode.numBlocks());
|
|
|
+ assertNotNull(inode.getLastBlock());
|
|
|
|
|
|
// Ensure a different client cannot append the file
|
|
|
try {
|
|
@@ -361,9 +377,18 @@ public class TestLeaseRecovery {
|
|
|
assertTrue(e.getMessage().contains("file lease is currently owned"));
|
|
|
}
|
|
|
|
|
|
- // Ensure the lease can be recovered on the first try
|
|
|
- boolean recovered = client.recoverLease(file);
|
|
|
- assertEquals(true, recovered);
|
|
|
+ // Lease will not be recovered on the first try
|
|
|
+ assertEquals(false, client.recoverLease(file));
|
|
|
+ for (int i=0; i < 10 && !client.recoverLease(file); i++) {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ assertTrue(client.recoverLease(file));
|
|
|
+
|
|
|
+ inode = cluster.getNamesystem().getFSDirectory()
|
|
|
+ .getINode(filePath.toString()).asFile();
|
|
|
+ assertTrue(!inode.isUnderConstruction());
|
|
|
+ assertEquals(0, inode.numBlocks());
|
|
|
+ assertNull(inode.getLastBlock());
|
|
|
|
|
|
// Ensure the recovered file can now be written
|
|
|
FSDataOutputStream append = dfs.append(filePath);
|
|
@@ -395,7 +420,7 @@ public class TestLeaseRecovery {
|
|
|
new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
|
|
String file = "/test/f1";
|
|
|
|
|
|
- createCommittedNotCompleteFile(client, file);
|
|
|
+ createCommittedNotCompleteFile(client, file, null, 1);
|
|
|
waitLeaseRecovery(cluster);
|
|
|
|
|
|
GenericTestUtils.waitFor(() -> {
|
|
@@ -415,23 +440,167 @@ public class TestLeaseRecovery {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void createCommittedNotCompleteFile(DFSClient client, String file)
|
|
|
- throws IOException {
|
|
|
+ @Test
|
|
|
+ public void testAbortedRecovery() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ DFSClient client = null;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ client =
|
|
|
+ new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
|
|
+ final String file = "/test/f1";
|
|
|
+
|
|
|
+ HdfsFileStatus stat = client.getNamenode()
|
|
|
+ .create(file, new FsPermission("777"), client.clientName,
|
|
|
+ new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
|
|
+ true, (short) 1, 1024 * 1024 * 128L,
|
|
|
+ new CryptoProtocolVersion[0], null, null);
|
|
|
+
|
|
|
+ assertNotNull(NameNodeAdapter.getLeaseHolderForPath(
|
|
|
+ cluster.getNameNode(), file));
|
|
|
+
|
|
|
+ // Add a block to the file
|
|
|
+ ExtendedBlock block = client.getNamenode().addBlock(
|
|
|
+ file, client.clientName, null, new DatanodeInfo[0], stat.getFileId(),
|
|
|
+ new String[0], null).getBlock();
|
|
|
+
|
|
|
+ // update the pipeline to get a new genstamp.
|
|
|
+ ExtendedBlock updatedBlock = client.getNamenode()
|
|
|
+ .updateBlockForPipeline(block, client.clientName)
|
|
|
+ .getBlock();
|
|
|
+ // fake that some data was maybe written. commit block sync will
|
|
|
+ // reconcile.
|
|
|
+ updatedBlock.setNumBytes(1234);
|
|
|
+
|
|
|
+ // get the stored block and make it look like the DN sent a RBW IBR.
|
|
|
+ BlockManager bm = cluster.getNamesystem().getBlockManager();
|
|
|
+ BlockInfo storedBlock = bm.getStoredBlock(block.getLocalBlock());
|
|
|
+ BlockUnderConstructionFeature uc =
|
|
|
+ storedBlock.getUnderConstructionFeature();
|
|
|
+ uc.setExpectedLocations(updatedBlock.getLocalBlock(),
|
|
|
+ uc.getExpectedStorageLocations(), BlockType.CONTIGUOUS);
|
|
|
+
|
|
|
+ // complete the file w/o updatePipeline to simulate client failure.
|
|
|
+ client.getNamenode().complete(file, client.clientName, block,
|
|
|
+ stat.getFileId());
|
|
|
+
|
|
|
+ assertNotNull(NameNodeAdapter.getLeaseHolderForPath(
|
|
|
+ cluster.getNameNode(), file));
|
|
|
+
|
|
|
+ cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ String holder = NameNodeAdapter
|
|
|
+ .getLeaseHolderForPath(cluster.getNameNode(), file);
|
|
|
+ return holder == null;
|
|
|
+ }
|
|
|
+ }, 100, 20000);
|
|
|
+ // nothing was actually written so the block should be dropped.
|
|
|
+ assertTrue(storedBlock.isDeleted());
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ cluster = null;
|
|
|
+ }
|
|
|
+ if (client != null) {
|
|
|
+ client.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLeaseManagerRecoversCommittedLastBlockWithContent()
|
|
|
+ throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ DFSClient client = null;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
|
|
+ client =
|
|
|
+ new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf);
|
|
|
+ String file = "/test/f2";
|
|
|
+
|
|
|
+ byte[] bytesToWrite = new byte[1];
|
|
|
+ bytesToWrite[0] = 123;
|
|
|
+ createCommittedNotCompleteFile(client, file, bytesToWrite, 3);
|
|
|
+
|
|
|
+ waitLeaseRecovery(cluster);
|
|
|
+
|
|
|
+ DistributedFileSystem hdfs = cluster.getFileSystem();
|
|
|
+
|
|
|
+ // Now the least has been recovered, attempt to append the file and then
|
|
|
+ // ensure the earlier written and newly written data can be read back.
|
|
|
+ FSDataOutputStream op = null;
|
|
|
+ try {
|
|
|
+ op = hdfs.append(new Path(file));
|
|
|
+ op.write(23);
|
|
|
+ } finally {
|
|
|
+ if (op != null) {
|
|
|
+ op.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ FSDataInputStream stream = null;
|
|
|
+ try {
|
|
|
+ stream = cluster.getFileSystem().open(new Path(file));
|
|
|
+ assertEquals(123, stream.readByte());
|
|
|
+ assertEquals(23, stream.readByte());
|
|
|
+ } finally {
|
|
|
+ stream.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Finally check there are no leases for the file and hence the file is
|
|
|
+ // closed.
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ String holder = NameNodeAdapter
|
|
|
+ .getLeaseHolderForPath(cluster.getNameNode(), file);
|
|
|
+ return holder == null;
|
|
|
+ }, 100, 10000);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ cluster = null;
|
|
|
+ }
|
|
|
+ if (client != null) {
|
|
|
+ client.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createCommittedNotCompleteFile(DFSClient client, String file,
|
|
|
+ byte[] bytesToWrite, int repFactor) throws IOException {
|
|
|
HdfsFileStatus stat = client.getNamenode()
|
|
|
- .create(file, new FsPermission("777"), "test client",
|
|
|
+ .create(file, new FsPermission("777"), client.clientName,
|
|
|
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)),
|
|
|
- true, (short) 1, 1024 * 1024 * 128L,
|
|
|
+ true, (short) repFactor, 1024 * 1024 * 128L,
|
|
|
new CryptoProtocolVersion[0], null, null);
|
|
|
// Add a block to the file
|
|
|
LocatedBlock blk = client.getNamenode()
|
|
|
- .addBlock(file, "test client", null,
|
|
|
+ .addBlock(file, client.clientName, null,
|
|
|
new DatanodeInfo[0], stat.getFileId(), new String[0], null);
|
|
|
- // Without writing anything to the file, or setting up the DN pipeline
|
|
|
- // attempt to close the file. This will fail (return false) as the NN will
|
|
|
+ ExtendedBlock finalBlock = blk.getBlock();
|
|
|
+ if (bytesToWrite != null) {
|
|
|
+ // Here we create a output stream and then abort it so the block gets
|
|
|
+ // created on the datanode, but we never send the message to tell the DN
|
|
|
+ // to complete the block. This simulates the client crashing after it
|
|
|
+ // wrote the data, but before the file gets closed.
|
|
|
+ DFSOutputStream s = new DFSOutputStream(client, file, stat,
|
|
|
+ EnumSet.of(CreateFlag.CREATE), null,
|
|
|
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512),
|
|
|
+ null, true);
|
|
|
+ s.start();
|
|
|
+ s.write(bytesToWrite);
|
|
|
+ s.hflush();
|
|
|
+ finalBlock = s.getBlock();
|
|
|
+ s.abort();
|
|
|
+ }
|
|
|
+ // Attempt to close the file. This will fail (return false) as the NN will
|
|
|
// be expecting the registered block to be reported from the DNs via IBR,
|
|
|
- // but that will never happen, as the pipeline was never established
|
|
|
+ // but that will never happen, as we either did not write it, or we aborted
|
|
|
+ // the stream preventing the "close block" message to be sent to the DN.
|
|
|
boolean closed = client.getNamenode().complete(
|
|
|
- file, "test client", blk.getBlock(), stat.getFileId());
|
|
|
+ file, client.clientName, finalBlock, stat.getFileId());
|
|
|
assertEquals(false, closed);
|
|
|
}
|
|
|
|