|
@@ -37,6 +37,7 @@ import java.util.Scanner;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.regex.Pattern;
|
|
import java.util.regex.Pattern;
|
|
|
|
+import java.util.EnumSet;
|
|
|
|
|
|
import java.util.function.Supplier;
|
|
import java.util.function.Supplier;
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
|
@@ -46,6 +47,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
@@ -55,6 +57,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
@@ -867,6 +871,69 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|
closedFileSet, openFilesMap, 0);
|
|
closedFileSet, openFilesMap, 0);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Verify Decommission In Progress with List Open Files
|
|
|
|
+ * 1. start decommissioning a node (set LeavingServiceStatus)
|
|
|
|
+ * 2. close file with decommissioning
|
|
|
|
+ * @throws Exception
|
|
|
|
+ */
|
|
|
|
+ @Test(timeout=180000)
|
|
|
|
+ public void testDecommissionWithCloseFileAndListOpenFiles()
|
|
|
|
+ throws Exception {
|
|
|
|
+ LOG.info("Starting test testDecommissionWithCloseFileAndListOpenFiles");
|
|
|
|
+
|
|
|
|
+ // Disable redundancy monitor check so that open files blocking
|
|
|
|
+ // decommission can be listed and verified.
|
|
|
|
+ getConf().setInt(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1000);
|
|
|
|
+ getConf().setLong(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1);
|
|
|
|
+
|
|
|
|
+ startSimpleCluster(1, 3);
|
|
|
|
+ FileSystem fileSys = getCluster().getFileSystem(0);
|
|
|
|
+ FSNamesystem ns = getCluster().getNamesystem(0);
|
|
|
|
+ Path file = new Path("/openFile");
|
|
|
|
+ FSDataOutputStream st = AdminStatesBaseTest.writeIncompleteFile(fileSys,
|
|
|
|
+ file, (short)3, (short)(fileSize / blockSize));
|
|
|
|
+ for (DataNode d: getCluster().getDataNodes()) {
|
|
|
|
+ DataNodeTestUtils.triggerBlockReport(d);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
|
|
|
|
+ getCluster().getNameNode(0), file.toUri().getPath(),
|
|
|
|
+ 0, blockSize * 10);
|
|
|
|
+ DatanodeInfo dnToDecommission = lbs.getLastLocatedBlock().getLocations()[0];
|
|
|
|
+
|
|
|
|
+ DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
|
|
|
|
+ dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid());
|
|
|
|
+ initExcludeHost(dnToDecommission.getXferAddr());
|
|
|
|
+ refreshNodes(0);
|
|
|
|
+ BlockManagerTestUtil.recheckDecommissionState(dm);
|
|
|
|
+ waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS);
|
|
|
|
+ Thread.sleep(3000);
|
|
|
|
+ //Make sure DatanodeAdminMonitor(DatanodeAdminBackoffMonitor) At least twice run.
|
|
|
|
+
|
|
|
|
+ BatchedEntries<OpenFileEntry> batchedListEntries = getCluster().
|
|
|
|
+ getNameNodeRpc(0).listOpenFiles(0,
|
|
|
|
+ EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION),
|
|
|
|
+ OpenFilesIterator.FILTER_PATH_DEFAULT);
|
|
|
|
+ assertEquals(1, batchedListEntries.size());
|
|
|
|
+ st.close(); //close file
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ batchedListEntries = getCluster().getNameNodeRpc().listOpenFiles(0,
|
|
|
|
+ EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION),
|
|
|
|
+ OpenFilesIterator.FILTER_PATH_DEFAULT);
|
|
|
|
+ assertEquals(0, batchedListEntries.size());
|
|
|
|
+ } catch (NullPointerException e) {
|
|
|
|
+ Assert.fail("Should not throw NPE when the file is not under " +
|
|
|
|
+ "construction but has lease!");
|
|
|
|
+ }
|
|
|
|
+ initExcludeHost("");
|
|
|
|
+ refreshNodes(0);
|
|
|
|
+ fileSys.delete(file, false);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test(timeout = 360000)
|
|
@Test(timeout = 360000)
|
|
public void testDecommissionWithOpenFileAndBlockRecovery()
|
|
public void testDecommissionWithOpenFileAndBlockRecovery()
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|