|
@@ -35,6 +35,9 @@ import java.util.Random;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -567,4 +570,56 @@ public class TestDataTransferProtocol {
|
|
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
|
0, block.getNumBytes(), block.getNumBytes(), newGS,
|
|
checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
|
|
checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
|
+ public void testReleaseVolumeRefIfExceptionThrown()
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ Path file = new Path("dataprotocol.dat");
|
|
|
|
+ int numDataNodes = 1;
|
|
|
|
+
|
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
|
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
|
|
|
+ numDataNodes).build();
|
|
|
|
+ try {
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ datanode = cluster.getFileSystem().getDataNodeStats(
|
|
|
|
+ DatanodeReportType.LIVE)[0];
|
|
|
|
+ dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
|
|
|
|
+ FileSystem fileSys = cluster.getFileSystem();
|
|
|
|
+
|
|
|
|
+ int fileLen = Math.min(
|
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
|
|
|
|
+
|
|
|
|
+ DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
|
|
|
|
+ fileSys.getDefaultBlockSize(file),
|
|
|
|
+ fileSys.getDefaultReplication(file), 0L);
|
|
|
|
+
|
|
|
|
+ // Get the first blockid for the file.
|
|
|
|
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
|
|
|
|
+
|
|
|
|
+ String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
|
+ ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
|
|
|
|
+ sendBuf.reset();
|
|
|
|
+ recvBuf.reset();
|
|
|
|
+
|
|
|
|
+ // Delete the meta file to create a exception in BlockSender constructor.
|
|
|
|
+ DataNode dn = cluster.getDataNodes().get(0);
|
|
|
|
+ cluster.getMaterializedReplica(0, blk).deleteMeta();
|
|
|
|
+
|
|
|
|
+ FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
|
|
|
|
+ dn).getVolume(blk);
|
|
|
|
+ int beforeCnt = volume.getReferenceCount();
|
|
|
|
+
|
|
|
|
+ sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
|
|
|
|
+ sendRecvData("Copy a block.", false);
|
|
|
|
+ Thread.sleep(3000);
|
|
|
|
+
|
|
|
|
+ int afterCnt = volume.getReferenceCount();
|
|
|
|
+ assertEquals(beforeCnt, afterCnt);
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|