|
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
import static org.mockito.Matchers.isA;
|
|
|
import static org.mockito.Mockito.atLeastOnce;
|
|
|
import static org.mockito.Mockito.mock;
|
|
@@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.Cluster;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.mapreduce.JobPriority;
|
|
|
import org.apache.hadoop.mapreduce.JobStatus;
|
|
|
+import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TaskReport;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.junit.Assert;
|
|
@@ -52,6 +54,42 @@ public class JobClientUnitTest {
|
|
|
void setCluster(Cluster cluster) {
|
|
|
this.cluster = cluster;
|
|
|
}
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public class TestJobClientGetJob extends TestJobClient {
|
|
|
+
|
|
|
+ int lastGetJobRetriesCounter = 0;
|
|
|
+ int getJobRetriesCounter = 0;
|
|
|
+ int getJobRetries = 0;
|
|
|
+ RunningJob runningJob;
|
|
|
+
|
|
|
+ TestJobClientGetJob(JobConf jobConf) throws IOException {
|
|
|
+ super(jobConf);
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getLastGetJobRetriesCounter() {
|
|
|
+ return lastGetJobRetriesCounter;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setGetJobRetries(int getJobRetries) {
|
|
|
+ this.getJobRetries = getJobRetries;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setRunningJob(RunningJob runningJob) {
|
|
|
+ this.runningJob = runningJob;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected RunningJob getJobInner(final JobID jobid) throws IOException {
|
|
|
+ if (getJobRetriesCounter >= getJobRetries) {
|
|
|
+ lastGetJobRetriesCounter = getJobRetriesCounter;
|
|
|
+ getJobRetriesCounter = 0;
|
|
|
+ return runningJob;
|
|
|
+ }
|
|
|
+ getJobRetriesCounter++;
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -124,6 +162,7 @@ public class JobClientUnitTest {
|
|
|
|
|
|
JobStatus mockJobStatus = mock(JobStatus.class);
|
|
|
when(mockJobStatus.getJobID()).thenReturn(jobID);
|
|
|
+ when(mockJobStatus.getJobName()).thenReturn(jobID.toString());
|
|
|
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
|
|
|
when(mockJobStatus.getStartTime()).thenReturn(startTime);
|
|
|
when(mockJobStatus.getUsername()).thenReturn("mockuser");
|
|
@@ -181,4 +220,30 @@ public class JobClientUnitTest {
|
|
|
assertNull(client.getJob(id));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testGetJobRetry() throws Exception {
|
|
|
+
|
|
|
+ //To prevent the test from running for a very long time, lower the retry
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ conf.set(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, "3");
|
|
|
+
|
|
|
+ TestJobClientGetJob client = new TestJobClientGetJob(conf);
|
|
|
+ JobID id = new JobID("ajob",1);
|
|
|
+ RunningJob rj = mock(RunningJob.class);
|
|
|
+ client.setRunningJob(rj);
|
|
|
+
|
|
|
+ //no retry
|
|
|
+ assertNotNull(client.getJob(id));
|
|
|
+ assertEquals(client.getLastGetJobRetriesCounter(), 0);
|
|
|
+
|
|
|
+ //3 retry
|
|
|
+ client.setGetJobRetries(3);
|
|
|
+ assertNotNull(client.getJob(id));
|
|
|
+ assertEquals(client.getLastGetJobRetriesCounter(), 3);
|
|
|
+
|
|
|
+ //beyond MAPREDUCE_JOBCLIENT_GETJOB_MAX_RETRY_KEY, will get null
|
|
|
+ client.setGetJobRetries(5);
|
|
|
+ assertNull(client.getJob(id));
|
|
|
+ }
|
|
|
+
|
|
|
}
|