|
@@ -17,20 +17,33 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.Arrays;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestTaskAttemptListenerImpl {
|
|
@@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl {
|
|
|
|
|
|
listener.stop();
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetMapCompletionEvents() throws IOException {
|
|
|
+ TaskAttemptCompletionEvent[] empty = {};
|
|
|
+ TaskAttemptCompletionEvent[] taskEvents = {
|
|
|
+ createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE),
|
|
|
+ createTce(1, false, TaskAttemptCompletionEventStatus.FAILED),
|
|
|
+ createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED),
|
|
|
+ createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) };
|
|
|
+ TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] };
|
|
|
+ Job mockJob = mock(Job.class);
|
|
|
+ when(mockJob.getTaskAttemptCompletionEvents(0, 100))
|
|
|
+ .thenReturn(taskEvents);
|
|
|
+ when(mockJob.getTaskAttemptCompletionEvents(0, 2))
|
|
|
+ .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
|
|
|
+ when(mockJob.getTaskAttemptCompletionEvents(2, 100))
|
|
|
+ .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
|
|
|
+ when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
|
|
|
+ when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
|
|
|
+ when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
|
|
|
+
|
|
|
+ AppContext appCtx = mock(AppContext.class);
|
|
|
+ when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
|
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
+ TaskAttemptListenerImpl listener =
|
|
|
+ new TaskAttemptListenerImpl(appCtx, secret) {
|
|
|
+ @Override
|
|
|
+ protected void registerHeartbeatHandler(Configuration conf) {
|
|
|
+ taskHeartbeatHandler = hbHandler;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ listener.init(conf);
|
|
|
+ listener.start();
|
|
|
+
|
|
|
+ JobID jid = new JobID("12345", 1);
|
|
|
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
|
|
|
+ MapTaskCompletionEventsUpdate update =
|
|
|
+ listener.getMapCompletionEvents(jid, 0, 100, tid);
|
|
|
+ assertEquals(2, update.events.length);
|
|
|
+ update = listener.getMapCompletionEvents(jid, 0, 2, tid);
|
|
|
+ assertEquals(2, update.events.length);
|
|
|
+ update = listener.getMapCompletionEvents(jid, 2, 100, tid);
|
|
|
+ assertEquals(0, update.events.length);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static TaskAttemptCompletionEvent createTce(int eventId,
|
|
|
+ boolean isMap, TaskAttemptCompletionEventStatus status) {
|
|
|
+ JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
|
|
|
+ TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
|
|
|
+ isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
|
|
|
+ : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
|
|
|
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
|
|
|
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
+ TaskAttemptCompletionEvent tce = recordFactory
|
|
|
+ .newRecordInstance(TaskAttemptCompletionEvent.class);
|
|
|
+ tce.setEventId(eventId);
|
|
|
+ tce.setAttemptId(attemptId);
|
|
|
+ tce.setStatus(status);
|
|
|
+ return tce;
|
|
|
+ }
|
|
|
+
|
|
|
}
|