|
@@ -37,14 +37,20 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -233,4 +239,69 @@ public class TestJobImpl {
|
|
|
Assert.assertTrue(job5.checkAccess(ugi1, null));
|
|
|
Assert.assertTrue(job5.checkAccess(ugi2, null));
|
|
|
}
|
|
|
+ @Test
|
|
|
+ public void testUberDecision() throws Exception {
|
|
|
+
|
|
|
+ // with default values, no of maps is 2
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ boolean isUber = testUberDecision(conf);
|
|
|
+ Assert.assertFalse(isUber);
|
|
|
+
|
|
|
+ // enable uber mode, no of maps is 2
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
|
|
|
+ isUber = testUberDecision(conf);
|
|
|
+ Assert.assertTrue(isUber);
|
|
|
+
|
|
|
+ // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
|
|
|
+ // reduces is 0
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
|
|
|
+ conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 0);
|
|
|
+ conf.setInt(MRJobConfig.NUM_REDUCES, 1);
|
|
|
+ isUber = testUberDecision(conf);
|
|
|
+ Assert.assertFalse(isUber);
|
|
|
+
|
|
|
+ // enable uber mode, no of maps is 2, no of reduces is 1 and uber task max
|
|
|
+ // reduces is 1
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
|
|
|
+ conf.setInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
|
|
|
+ conf.setInt(MRJobConfig.NUM_REDUCES, 1);
|
|
|
+ isUber = testUberDecision(conf);
|
|
|
+ Assert.assertTrue(isUber);
|
|
|
+
|
|
|
+ // enable uber mode, no of maps is 2 and uber task max maps is 0
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
|
|
|
+ conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
|
|
|
+ isUber = testUberDecision(conf);
|
|
|
+ Assert.assertFalse(isUber);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean testUberDecision(Configuration conf) {
|
|
|
+ JobID jobID = JobID.forName("job_1234567890000_0001");
|
|
|
+ JobId jobId = TypeConverter.toYarn(jobID);
|
|
|
+ MRAppMetrics mrAppMetrics = MRAppMetrics.create();
|
|
|
+ JobImpl job = new JobImpl(jobId, Records
|
|
|
+ .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
|
|
|
+ null, mock(JobTokenSecretManager.class), null, null, null,
|
|
|
+ mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
|
|
|
+ InitTransition initTransition = getInitTransition();
|
|
|
+ JobEvent mockJobEvent = mock(JobEvent.class);
|
|
|
+ initTransition.transition(job, mockJobEvent);
|
|
|
+ boolean isUber = job.isUber();
|
|
|
+ return isUber;
|
|
|
+ }
|
|
|
+
|
|
|
+ private InitTransition getInitTransition() {
|
|
|
+ InitTransition initTransition = new InitTransition() {
|
|
|
+ @Override
|
|
|
+ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
|
|
|
+ return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
|
|
|
+ new TaskSplitMetaInfo() };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ return initTransition;
|
|
|
+ }
|
|
|
}
|