|
@@ -25,7 +25,6 @@ import java.io.PrintStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
|
|
@@ -55,6 +54,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
|
|
+import org.apache.hadoop.hdfs.util.HostsFileWriter;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.junit.After;
|
|
@@ -71,10 +71,8 @@ public class TestDecommissioningStatus {
|
|
|
private static final int numDatanodes = 2;
|
|
|
private static MiniDFSCluster cluster;
|
|
|
private static FileSystem fileSys;
|
|
|
- private static Path excludeFile;
|
|
|
- private static FileSystem localFileSys;
|
|
|
+ private static HostsFileWriter hostsFileWriter;
|
|
|
private static Configuration conf;
|
|
|
- private static Path dir;
|
|
|
|
|
|
final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
|
|
|
|
|
@@ -85,14 +83,8 @@ public class TestDecommissioningStatus {
|
|
|
false);
|
|
|
|
|
|
// Set up the hosts/exclude files.
|
|
|
- localFileSys = FileSystem.getLocal(conf);
|
|
|
- Path workingDir = localFileSys.getWorkingDirectory();
|
|
|
- dir = new Path(workingDir, "build/test/data/work-dir/decommission");
|
|
|
- assertTrue(localFileSys.mkdirs(dir));
|
|
|
- excludeFile = new Path(dir, "exclude");
|
|
|
- conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
|
|
- Path includeFile = new Path(dir, "include");
|
|
|
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
|
|
|
+ hostsFileWriter = new HostsFileWriter();
|
|
|
+ hostsFileWriter.initialize(conf, "work-dir/decommission");
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
|
1000);
|
|
|
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
@@ -102,9 +94,6 @@ public class TestDecommissioningStatus {
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
|
|
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
|
|
|
|
|
|
- writeConfigFile(localFileSys, excludeFile, null);
|
|
|
- writeConfigFile(localFileSys, includeFile, null);
|
|
|
-
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).build();
|
|
|
cluster.waitActive();
|
|
|
fileSys = cluster.getFileSystem();
|
|
@@ -115,31 +104,13 @@ public class TestDecommissioningStatus {
|
|
|
|
|
|
@After
|
|
|
public void tearDown() throws Exception {
|
|
|
- if (localFileSys != null ) cleanupFile(localFileSys, dir);
|
|
|
+ if (hostsFileWriter != null) {
|
|
|
+ hostsFileWriter.cleanup();
|
|
|
+ }
|
|
|
if(fileSys != null) fileSys.close();
|
|
|
if(cluster != null) cluster.shutdown();
|
|
|
}
|
|
|
|
|
|
- private static void writeConfigFile(FileSystem fs, Path name,
|
|
|
- ArrayList<String> nodes) throws IOException {
|
|
|
-
|
|
|
- // delete if it already exists
|
|
|
- if (fs.exists(name)) {
|
|
|
- fs.delete(name, true);
|
|
|
- }
|
|
|
-
|
|
|
- FSDataOutputStream stm = fs.create(name);
|
|
|
-
|
|
|
- if (nodes != null) {
|
|
|
- for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
|
|
|
- String node = it.next();
|
|
|
- stm.writeBytes(node);
|
|
|
- stm.writeBytes("\n");
|
|
|
- }
|
|
|
- }
|
|
|
- stm.close();
|
|
|
- }
|
|
|
-
|
|
|
private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name,
|
|
|
short repl) throws IOException {
|
|
|
// create and write a file that contains three blocks of data
|
|
@@ -169,25 +140,25 @@ public class TestDecommissioningStatus {
|
|
|
* Decommissions the node at the given index
|
|
|
*/
|
|
|
private String decommissionNode(FSNamesystem namesystem, DFSClient client,
|
|
|
- FileSystem localFileSys, int nodeIndex) throws IOException {
|
|
|
+ int nodeIndex) throws IOException {
|
|
|
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
|
|
|
|
|
|
String nodename = info[nodeIndex].getXferAddr();
|
|
|
- decommissionNode(namesystem, localFileSys, nodename);
|
|
|
+ decommissionNode(namesystem, nodename);
|
|
|
return nodename;
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Decommissions the node by name
|
|
|
*/
|
|
|
- private void decommissionNode(FSNamesystem namesystem,
|
|
|
- FileSystem localFileSys, String dnName) throws IOException {
|
|
|
+ private void decommissionNode(FSNamesystem namesystem, String dnName)
|
|
|
+ throws IOException {
|
|
|
System.out.println("Decommissioning node: " + dnName);
|
|
|
|
|
|
// write nodename into the exclude file.
|
|
|
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
|
|
|
nodes.add(dnName);
|
|
|
- writeConfigFile(localFileSys, excludeFile, nodes);
|
|
|
+ hostsFileWriter.initExcludeHosts(nodes.toArray(new String[0]));
|
|
|
}
|
|
|
|
|
|
private void checkDecommissionStatus(DatanodeDescriptor decommNode,
|
|
@@ -280,7 +251,7 @@ public class TestDecommissioningStatus {
|
|
|
FSNamesystem fsn = cluster.getNamesystem();
|
|
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
|
|
for (int iteration = 0; iteration < numDatanodes; iteration++) {
|
|
|
- String downnode = decommissionNode(fsn, client, localFileSys, iteration);
|
|
|
+ String downnode = decommissionNode(fsn, client, iteration);
|
|
|
dm.refreshNodes(conf);
|
|
|
decommissionedNodes.add(downnode);
|
|
|
BlockManagerTestUtil.recheckDecommissionState(dm);
|
|
@@ -307,7 +278,7 @@ public class TestDecommissioningStatus {
|
|
|
// Call refreshNodes on FSNamesystem with empty exclude file.
|
|
|
// This will remove the datanodes from decommissioning list and
|
|
|
// make them available again.
|
|
|
- writeConfigFile(localFileSys, excludeFile, null);
|
|
|
+ hostsFileWriter.initExcludeHost("");
|
|
|
dm.refreshNodes(conf);
|
|
|
st1.close();
|
|
|
cleanupFile(fileSys, file1);
|
|
@@ -337,7 +308,7 @@ public class TestDecommissioningStatus {
|
|
|
// Decommission the DN.
|
|
|
FSNamesystem fsn = cluster.getNamesystem();
|
|
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
|
|
- decommissionNode(fsn, localFileSys, dnName);
|
|
|
+ decommissionNode(fsn, dnName);
|
|
|
dm.refreshNodes(conf);
|
|
|
|
|
|
// Stop the DN when decommission is in progress.
|
|
@@ -384,7 +355,7 @@ public class TestDecommissioningStatus {
|
|
|
// Call refreshNodes on FSNamesystem with empty exclude file.
|
|
|
// This will remove the datanodes from decommissioning list and
|
|
|
// make them available again.
|
|
|
- writeConfigFile(localFileSys, excludeFile, null);
|
|
|
+ hostsFileWriter.initExcludeHost("");
|
|
|
dm.refreshNodes(conf);
|
|
|
}
|
|
|
|
|
@@ -405,7 +376,7 @@ public class TestDecommissioningStatus {
|
|
|
FSNamesystem fsn = cluster.getNamesystem();
|
|
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
|
|
DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
|
|
|
- decommissionNode(fsn, localFileSys, dnName);
|
|
|
+ decommissionNode(fsn, dnName);
|
|
|
dm.refreshNodes(conf);
|
|
|
BlockManagerTestUtil.recheckDecommissionState(dm);
|
|
|
assertTrue(dnDescriptor.isDecommissioned());
|
|
@@ -416,7 +387,7 @@ public class TestDecommissioningStatus {
|
|
|
|
|
|
// Call refreshNodes on FSNamesystem with empty exclude file to remove the
|
|
|
// datanode from decommissioning list and make it available again.
|
|
|
- writeConfigFile(localFileSys, excludeFile, null);
|
|
|
+ hostsFileWriter.initExcludeHost("");
|
|
|
dm.refreshNodes(conf);
|
|
|
}
|
|
|
}
|