|
@@ -22,16 +22,23 @@ import static org.junit.Assert.assertNotNull;
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.PrintStream;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Scanner;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
import com.google.common.collect.Lists;
|
|
|
+import org.apache.commons.lang.text.StrBuilder;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
@@ -60,7 +67,9 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
|
|
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Ignore;
|
|
@@ -651,6 +660,174 @@ public class TestDecommission extends AdminStatesBaseTest {
|
|
|
fdos.close();
|
|
|
}
|
|
|
|
|
|
+ private static String scanIntoString(final ByteArrayOutputStream baos) {
|
|
|
+ final StrBuilder sb = new StrBuilder();
|
|
|
+ final Scanner scanner = new Scanner(baos.toString());
|
|
|
+ while (scanner.hasNextLine()) {
|
|
|
+ sb.appendln(scanner.nextLine());
|
|
|
+ }
|
|
|
+ scanner.close();
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean verifyOpenFilesListing(String message,
|
|
|
+ HashSet<Path> closedFileSet,
|
|
|
+ HashMap<Path, FSDataOutputStream> openFilesMap,
|
|
|
+ ByteArrayOutputStream out, int expOpenFilesListSize) {
|
|
|
+ final String outStr = scanIntoString(out);
|
|
|
+ LOG.info(message + " - stdout: \n" + outStr);
|
|
|
+ for (Path closedFilePath : closedFileSet) {
|
|
|
+ if(outStr.contains(closedFilePath.toString())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ HashSet<Path> openFilesNotListed = new HashSet<>();
|
|
|
+ for (Path openFilePath : openFilesMap.keySet()) {
|
|
|
+ if(!outStr.contains(openFilePath.toString())) {
|
|
|
+ openFilesNotListed.add(openFilePath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int actualOpenFilesListedSize =
|
|
|
+ openFilesMap.size() - openFilesNotListed.size();
|
|
|
+ if (actualOpenFilesListedSize >= expOpenFilesListSize) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ LOG.info("Open files that are not listed yet: " + openFilesNotListed);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyOpenFilesBlockingDecommission(HashSet<Path> closedFileSet,
|
|
|
+ HashMap<Path, FSDataOutputStream> openFilesMap, final int maxOpenFiles)
|
|
|
+ throws Exception {
|
|
|
+ final PrintStream oldStreamOut = System.out;
|
|
|
+ try {
|
|
|
+ final ByteArrayOutputStream toolOut = new ByteArrayOutputStream();
|
|
|
+ System.setOut(new PrintStream(toolOut));
|
|
|
+ final DFSAdmin dfsAdmin = new DFSAdmin(getConf());
|
|
|
+
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ try {
|
|
|
+ toolOut.reset();
|
|
|
+ assertEquals(0, ToolRunner.run(dfsAdmin,
|
|
|
+ new String[]{"-listOpenFiles", "-blockingDecommission"}));
|
|
|
+ toolOut.flush();
|
|
|
+ return verifyOpenFilesListing(
|
|
|
+ "dfsadmin -listOpenFiles -blockingDecommission",
|
|
|
+ closedFileSet, openFilesMap, toolOut, maxOpenFiles);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Unexpected exception: " + e);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 1000, 60000);
|
|
|
+ } finally {
|
|
|
+ System.setOut(oldStreamOut);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=180000)
|
|
|
+ public void testDecommissionWithOpenfileReporting()
|
|
|
+ throws Exception {
|
|
|
+ LOG.info("Starting test testDecommissionWithOpenfileReporting");
|
|
|
+
|
|
|
+ // Disable redundancy monitor check so that open files blocking
|
|
|
+ // decommission can be listed and verified.
|
|
|
+ getConf().setInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
|
|
+ 1000);
|
|
|
+ getConf().setLong(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 1);
|
|
|
+
|
|
|
+ //At most 1 node can be decommissioned
|
|
|
+ startSimpleCluster(1, 4);
|
|
|
+
|
|
|
+ FileSystem fileSys = getCluster().getFileSystem(0);
|
|
|
+ FSNamesystem ns = getCluster().getNamesystem(0);
|
|
|
+
|
|
|
+ final String[] closedFiles = new String[3];
|
|
|
+ final String[] openFiles = new String[3];
|
|
|
+ HashSet<Path> closedFileSet = new HashSet<>();
|
|
|
+ HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>();
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ closedFiles[i] = "/testDecommissionWithOpenfileReporting.closed." + i;
|
|
|
+ openFiles[i] = "/testDecommissionWithOpenfileReporting.open." + i;
|
|
|
+ writeFile(fileSys, new Path(closedFiles[i]), (short)3, 10);
|
|
|
+ closedFileSet.add(new Path(closedFiles[i]));
|
|
|
+ writeFile(fileSys, new Path(openFiles[i]), (short)3, 10);
|
|
|
+ FSDataOutputStream fdos = fileSys.append(new Path(openFiles[i]));
|
|
|
+ openFilesMap.put(new Path(openFiles[i]), fdos);
|
|
|
+ }
|
|
|
+
|
|
|
+ HashMap<DatanodeInfo, Integer> dnInfoMap = new HashMap<>();
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
|
|
|
+ getCluster().getNameNode(0), openFiles[i], 0, blockSize * 10);
|
|
|
+ for (DatanodeInfo dn : lbs.getLastLocatedBlock().getLocations()) {
|
|
|
+ if (dnInfoMap.containsKey(dn)) {
|
|
|
+ dnInfoMap.put(dn, dnInfoMap.get(dn) + 1);
|
|
|
+ } else {
|
|
|
+ dnInfoMap.put(dn, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ DatanodeInfo dnToDecommission = null;
|
|
|
+ int maxDnOccurance = 0;
|
|
|
+ for (Map.Entry<DatanodeInfo, Integer> entry : dnInfoMap.entrySet()) {
|
|
|
+ if (entry.getValue() > maxDnOccurance) {
|
|
|
+ maxDnOccurance = entry.getValue();
|
|
|
+ dnToDecommission = entry.getKey();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("XXX Dn to decommission: " + dnToDecommission + ", max: "
|
|
|
+ + maxDnOccurance);
|
|
|
+
|
|
|
+ //decommission one of the 3 nodes which have last block
|
|
|
+ DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
|
|
|
+ ArrayList<String> nodes = new ArrayList<>();
|
|
|
+ dnToDecommission = dm.getDatanode(dnToDecommission.getDatanodeUuid());
|
|
|
+ nodes.add(dnToDecommission.getXferAddr());
|
|
|
+ initExcludeHosts(nodes);
|
|
|
+ refreshNodes(0);
|
|
|
+ waitNodeState(dnToDecommission, AdminStates.DECOMMISSION_INPROGRESS);
|
|
|
+
|
|
|
+ // list and verify all the open files that are blocking decommission
|
|
|
+ verifyOpenFilesBlockingDecommission(
|
|
|
+ closedFileSet, openFilesMap, maxDnOccurance);
|
|
|
+
|
|
|
+ final AtomicBoolean stopRedundancyMonitor = new AtomicBoolean(false);
|
|
|
+ Thread monitorThread = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (!stopRedundancyMonitor.get()) {
|
|
|
+ try {
|
|
|
+ BlockManagerTestUtil.checkRedundancy(
|
|
|
+ getCluster().getNamesystem().getBlockManager());
|
|
|
+ BlockManagerTestUtil.updateState(
|
|
|
+ getCluster().getNamesystem().getBlockManager());
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Encountered exception during redundancy monitor: " + e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ monitorThread.start();
|
|
|
+
|
|
|
+ waitNodeState(dnToDecommission, AdminStates.DECOMMISSIONED);
|
|
|
+ stopRedundancyMonitor.set(true);
|
|
|
+ monitorThread.join();
|
|
|
+
|
|
|
+ // Open file is no more blocking decommission as all its blocks
|
|
|
+ // are re-replicated.
|
|
|
+ openFilesMap.clear();
|
|
|
+ verifyOpenFilesBlockingDecommission(
|
|
|
+ closedFileSet, openFilesMap, 0);
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout = 360000)
|
|
|
public void testDecommissionWithOpenFileAndBlockRecovery()
|
|
|
throws IOException, InterruptedException {
|