|
@@ -30,6 +30,7 @@ import junit.framework.TestCase;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -375,4 +376,82 @@ public class TestNodeRefresh extends TestCase {
|
|
|
|
|
|
stopCluster();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if excluded hosts are decommissioned across restart
|
|
|
+ */
|
|
|
+ public void testMRExcludeHostsAcrossRestarts() throws IOException {
|
|
|
+ // start a cluster with 2 hosts and empty exclude-hosts file
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean("mapred.jobtracker.restart.recover", true);
|
|
|
+
|
|
|
+ File file = new File("hosts.exclude");
|
|
|
+ file.delete();
|
|
|
+ startCluster(2, 1, 0, conf);
|
|
|
+ String hostToDecommission = getHostname(1);
|
|
|
+ conf = mr.createJobConf(new JobConf(conf));
|
|
|
+
|
|
|
+ // submit a job
|
|
|
+ Path inDir = new Path("input");
|
|
|
+ Path outDir = new Path("output");
|
|
|
+ Path signalFilename = new Path("share");
|
|
|
+ JobConf newConf = new JobConf(conf);
|
|
|
+ UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,
|
|
|
+ "restart-decommission", signalFilename.toString(),
|
|
|
+ signalFilename.toString());
|
|
|
+
|
|
|
+ JobClient jobClient = new JobClient(newConf);
|
|
|
+ RunningJob job = jobClient.submitJob(newConf);
|
|
|
+ JobID id = job.getID();
|
|
|
+
|
|
|
+ // wait for 50%
|
|
|
+ while (job.mapProgress() < 0.5f) {
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
+ }
|
|
|
+
|
|
|
+ // change the exclude-hosts file to include one host
|
|
|
+ FileOutputStream out = new FileOutputStream(file);
|
|
|
+ LOG.info("Writing excluded nodes to log file " + file.toString());
|
|
|
+ BufferedWriter writer = null;
|
|
|
+ try {
|
|
|
+ writer = new BufferedWriter(new OutputStreamWriter(out));
|
|
|
+ writer.write( hostToDecommission + "\n"); // decommission first host
|
|
|
+ } finally {
|
|
|
+ if (writer != null) {
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ file.deleteOnExit();
|
|
|
+
|
|
|
+ // restart the jobtracker
|
|
|
+ mr.stopJobTracker();
|
|
|
+ mr.startJobTracker();
|
|
|
+ // Wait for the JT to be ready
|
|
|
+ UtilsForTests.waitForJobTracker(jobClient);
|
|
|
+
|
|
|
+ jt = mr.getJobTrackerRunner().getJobTracker();
|
|
|
+ UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),
|
|
|
+ signalFilename.toString(), signalFilename.toString(), 1);
|
|
|
+
|
|
|
+ assertTrue("Decommissioning of tracker has no effect restarted job",
|
|
|
+ jt.getJob(job.getID()).failedMapTasks > 0);
|
|
|
+
|
|
|
+ // check the cluster status and tracker size
|
|
|
+ assertEquals("Tracker is not lost upon host decommissioning",
|
|
|
+ 1, jt.getClusterStatus(false).getTaskTrackers());
|
|
|
+ assertEquals("Excluded node count is incorrect",
|
|
|
+ 1, jt.getClusterStatus(false).getNumExcludedNodes());
|
|
|
+
|
|
|
+ // check if the host is disallowed
|
|
|
+ for (TaskTrackerStatus status : jt.taskTrackers()) {
|
|
|
+ assertFalse("Tracker from decommissioned host still exist",
|
|
|
+ status.getHost().equals(hostToDecommission));
|
|
|
+ }
|
|
|
+
|
|
|
+ // wait for the job
|
|
|
+ job.waitForCompletion();
|
|
|
+
|
|
|
+ stopCluster();
|
|
|
+ }
|
|
|
}
|