|
@@ -21,14 +21,19 @@ import com.google.common.collect.Lists;
|
|
|
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.StorageType;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
@@ -50,6 +55,7 @@ import org.apache.hadoop.io.MultipleIOException;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.DiskChecker;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Matchers;
|
|
@@ -525,4 +531,64 @@ public class TestFsDatasetImpl {
|
|
|
LOG.info("Volumes removed");
|
|
|
brReceivedLatch.await();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests stopping all the active DataXceiver thread on volume failure event.
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testCleanShutdownOfVolume() throws Exception {
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ try {
|
|
|
+ Configuration config = new HdfsConfiguration();
|
|
|
+ config.setLong(
|
|
|
+ DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
|
|
|
+ config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
|
|
+
|
|
|
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
+ DataNode dataNode = cluster.getDataNodes().get(0);
|
|
|
+ Path filePath = new Path("test.dat");
|
|
|
+ // Create a file and keep the output stream unclosed.
|
|
|
+ FSDataOutputStream out = fs.create(filePath, (short) 1);
|
|
|
+ out.write(1);
|
|
|
+ out.hflush();
|
|
|
+
|
|
|
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
|
|
+ FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
|
|
|
+ block);
|
|
|
+ File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
|
|
|
+ .getBlockPoolId());
|
|
|
+
|
|
|
+ if (finalizedDir.exists()) {
|
|
|
+ // Remove write and execute access so that checkDiskErrorThread detects
|
|
|
+ // this volume is bad.
|
|
|
+ finalizedDir.setExecutable(false);
|
|
|
+ finalizedDir.setWritable(false);
|
|
|
+ }
|
|
|
+ Assert.assertTrue("Reference count for the volume should be greater "
|
|
|
+ + "than 0", volume.getReferenceCount() > 0);
|
|
|
+ // Invoke the synchronous checkDiskError method
|
|
|
+ dataNode.getFSDataset().checkDataDir();
|
|
|
+ // Sleep for 1 second so that datanode can interrupt and cluster clean up
|
|
|
+ Thread.sleep(1000);
|
|
|
+ assertEquals("There are active threads still referencing volume: "
|
|
|
+ + volume.getBasePath(), 0, volume.getReferenceCount());
|
|
|
+ LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
|
|
|
+ DatanodeInfo info = lb.getLocations()[0];
|
|
|
+
|
|
|
+ try {
|
|
|
+ out.close();
|
|
|
+ Assert.fail("This is not a valid code path. "
|
|
|
+ + "out.close should have thrown an exception.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe.getMessage().contains(info.toString()));
|
|
|
+ }
|
|
|
+ finalizedDir.setWritable(true);
|
|
|
+ finalizedDir.setExecutable(true);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|