|
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
|
@@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
@@ -279,14 +282,17 @@ import org.junit.Test;
|
|
|
}
|
|
|
|
|
|
private final class MRAppTestCleanup extends MRApp {
|
|
|
- boolean stoppedContainerAllocator;
|
|
|
- boolean cleanedBeforeContainerAllocatorStopped;
|
|
|
-
|
|
|
+ int stagingDirCleanedup;
|
|
|
+ int ContainerAllocatorStopped;
|
|
|
+ int JobHistoryEventHandlerStopped;
|
|
|
+ int numStops;
|
|
|
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
|
|
|
String testName, boolean cleanOnStart) {
|
|
|
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
|
|
- stoppedContainerAllocator = false;
|
|
|
- cleanedBeforeContainerAllocatorStopped = false;
|
|
|
+ stagingDirCleanedup = 0;
|
|
|
+ ContainerAllocatorStopped = 0;
|
|
|
+ JobHistoryEventHandlerStopped = 0;
|
|
|
+ numStops = 0;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -312,6 +318,26 @@ import org.junit.Test;
|
|
|
return newJob;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
|
|
+ AppContext context) {
|
|
|
+ return new TestJobHistoryEventHandler(context, getStartCount());
|
|
|
+ }
|
|
|
+
|
|
|
+ private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
|
|
|
+
|
|
|
+ public TestJobHistoryEventHandler(AppContext context, int startCount) {
|
|
|
+ super(context, startCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void serviceStop() throws Exception {
|
|
|
+ numStops++;
|
|
|
+ JobHistoryEventHandlerStopped = numStops;
|
|
|
+ super.serviceStop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected ContainerAllocator createContainerAllocator(
|
|
|
ClientService clientService, AppContext context) {
|
|
@@ -334,7 +360,8 @@ import org.junit.Test;
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
- stoppedContainerAllocator = true;
|
|
|
+ numStops++;
|
|
|
+ ContainerAllocatorStopped = numStops;
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
}
|
|
@@ -346,7 +373,8 @@ import org.junit.Test;
|
|
|
|
|
|
@Override
|
|
|
public void cleanupStagingDir() throws IOException {
|
|
|
- cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
|
|
|
+ numStops++;
|
|
|
+ stagingDirCleanedup = numStops;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -377,11 +405,15 @@ import org.junit.Test;
|
|
|
app.verifyCompleted();
|
|
|
|
|
|
int waitTime = 20 * 1000;
|
|
|
- while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
|
|
|
+ while (waitTime > 0 && app.numStops < 3 ) {
|
|
|
Thread.sleep(100);
|
|
|
waitTime -= 100;
|
|
|
}
|
|
|
- Assert.assertTrue("Staging directory not cleaned before notifying RM",
|
|
|
- app.cleanedBeforeContainerAllocatorStopped);
|
|
|
+
|
|
|
+ // assert JobHistoryEventHandlerStopped first, then
|
|
|
+ // ContainerAllocatorStopped, and then stagingDirCleanedup
|
|
|
+ Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
|
|
|
+ Assert.assertEquals(2, app.ContainerAllocatorStopped);
|
|
|
+ Assert.assertEquals(3, app.stagingDirCleanedup);
|
|
|
}
|
|
|
}
|