|
@@ -25,10 +25,13 @@ import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
+import static org.mockito.Mockito.never;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
|
|
|
+import junit.framework.Assert;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -43,6 +46,7 @@ import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
@@ -229,6 +233,98 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // In case of all types of events, process Done files if it's last AM retry
|
|
|
+ @Test (timeout=50000)
|
|
|
+ public void testProcessDoneFilesOnLastAMRetry() throws Exception {
|
|
|
+ TestParams t = new TestParams(true);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ JHEvenHandlerForTest realJheh =
|
|
|
+ new JHEvenHandlerForTest(t.mockAppContext, 0);
|
|
|
+ JHEvenHandlerForTest jheh = spy(realJheh);
|
|
|
+ jheh.init(conf);
|
|
|
+
|
|
|
+ EventWriter mockWriter = null;
|
|
|
+ try {
|
|
|
+ jheh.start();
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
|
|
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
|
|
+ verify(jheh, times(0)).processDoneFiles(any(JobId.class));
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, JobStateInternal.ERROR.toString())));
|
|
|
+ verify(jheh, times(1)).processDoneFiles(any(JobId.class));
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
|
|
+ TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
|
|
|
+ new Counters(), new Counters())));
|
|
|
+ verify(jheh, times(2)).processDoneFiles(any(JobId.class));
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
+ verify(jheh, times(3)).processDoneFiles(any(JobId.class));
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, JobStateInternal.KILLED.toString())));
|
|
|
+ verify(jheh, times(4)).processDoneFiles(any(JobId.class));
|
|
|
+
|
|
|
+ mockWriter = jheh.getEventWriter();
|
|
|
+ verify(mockWriter, times(5)).write(any(HistoryEvent.class));
|
|
|
+ } finally {
|
|
|
+ jheh.stop();
|
|
|
+ verify(mockWriter).close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Skip processing Done files in case of ERROR, if it's not last AM retry
|
|
|
+ @Test (timeout=50000)
|
|
|
+ public void testProcessDoneFilesNotLastAMRetry() throws Exception {
|
|
|
+ TestParams t = new TestParams(false);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ JHEvenHandlerForTest realJheh =
|
|
|
+ new JHEvenHandlerForTest(t.mockAppContext, 0);
|
|
|
+ JHEvenHandlerForTest jheh = spy(realJheh);
|
|
|
+ jheh.init(conf);
|
|
|
+
|
|
|
+ EventWriter mockWriter = null;
|
|
|
+ try {
|
|
|
+ jheh.start();
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
|
|
|
+ t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
|
|
|
+ verify(jheh, times(0)).processDoneFiles(t.jobId);
|
|
|
+
|
|
|
+ // skip processing done files
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, JobStateInternal.ERROR.toString())));
|
|
|
+ verify(jheh, times(0)).processDoneFiles(t.jobId);
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
|
|
+ TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
|
|
|
+ new Counters(), new Counters())));
|
|
|
+ verify(jheh, times(1)).processDoneFiles(t.jobId);
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
+ verify(jheh, times(2)).processDoneFiles(t.jobId);
|
|
|
+
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
|
|
|
+ 0, 0, JobStateInternal.KILLED.toString())));
|
|
|
+ verify(jheh, times(3)).processDoneFiles(t.jobId);
|
|
|
+
|
|
|
+ mockWriter = jheh.getEventWriter();
|
|
|
+ verify(mockWriter, times(5)).write(any(HistoryEvent.class));
|
|
|
+ } finally {
|
|
|
+ jheh.stop();
|
|
|
+ verify(mockWriter).close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
|
|
|
jheh.handle(event);
|
|
|
}
|
|
@@ -258,20 +354,23 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private AppContext mockAppContext(ApplicationId appId) {
|
|
|
+ private AppContext mockAppContext(ApplicationId appId, boolean isLastAMRetry) {
|
|
|
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
|
|
|
AppContext mockContext = mock(AppContext.class);
|
|
|
Job mockJob = mock(Job.class);
|
|
|
+ when(mockJob.getAllCounters()).thenReturn(new Counters());
|
|
|
when(mockJob.getTotalMaps()).thenReturn(10);
|
|
|
when(mockJob.getTotalReduces()).thenReturn(10);
|
|
|
when(mockJob.getName()).thenReturn("mockjob");
|
|
|
when(mockContext.getJob(jobId)).thenReturn(mockJob);
|
|
|
when(mockContext.getApplicationID()).thenReturn(appId);
|
|
|
+ when(mockContext.isLastAMRetry()).thenReturn(isLastAMRetry);
|
|
|
return mockContext;
|
|
|
}
|
|
|
|
|
|
|
|
|
private class TestParams {
|
|
|
+ boolean isLastAMRetry;
|
|
|
String workDir = setupTestWorkDir();
|
|
|
ApplicationId appId = ApplicationId.newInstance(200, 1);
|
|
|
ApplicationAttemptId appAttemptId =
|
|
@@ -279,7 +378,15 @@ public class TestJobHistoryEventHandler {
|
|
|
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
|
|
|
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
|
|
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
|
|
- AppContext mockAppContext = mockAppContext(appId);
|
|
|
+ AppContext mockAppContext;
|
|
|
+
|
|
|
+ public TestParams() {
|
|
|
+ this(false);
|
|
|
+ }
|
|
|
+ public TestParams(boolean isLastAMRetry) {
|
|
|
+ this.isLastAMRetry = isLastAMRetry;
|
|
|
+ mockAppContext = mockAppContext(appId, this.isLastAMRetry);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
|
|
@@ -344,7 +451,6 @@ public class TestJobHistoryEventHandler {
|
|
|
class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
|
|
|
|
|
private EventWriter eventWriter;
|
|
|
-
|
|
|
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
|
|
super(context, startCount);
|
|
|
}
|
|
@@ -367,6 +473,11 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
|
|
public EventWriter getEventWriter() {
|
|
|
return this.eventWriter;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void processDoneFiles(JobId jobId){
|
|
|
+ // do nothing
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|