|
@@ -31,14 +31,15 @@ import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
|
-
|
|
|
-import junit.framework.Assert;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
@@ -46,15 +47,83 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Event;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.mockito.ArgumentCaptor;
|
|
|
+import org.mockito.Captor;
|
|
|
+import org.mockito.Mock;
|
|
|
+import org.mockito.runners.MockitoJUnitRunner;
|
|
|
+
|
|
|
|
|
|
+/**
|
|
|
+ * Tests the behavior of TaskAttemptListenerImpl.
|
|
|
+ */
|
|
|
+@RunWith(MockitoJUnitRunner.class)
|
|
|
public class TestTaskAttemptListenerImpl {
|
|
|
- public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
|
|
|
+ private static final String ATTEMPT1_ID =
|
|
|
+ "attempt_123456789012_0001_m_000001_0";
|
|
|
+ private static final String ATTEMPT2_ID =
|
|
|
+ "attempt_123456789012_0001_m_000002_0";
|
|
|
+
|
|
|
+ private static final TaskAttemptId TASKATTEMPTID1 =
|
|
|
+ TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
|
|
|
+ private static final TaskAttemptId TASKATTEMPTID2 =
|
|
|
+ TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
|
|
|
+
|
|
|
+ @Mock
|
|
|
+ private AppContext appCtx;
|
|
|
+
|
|
|
+ @Mock
|
|
|
+ private JobTokenSecretManager secret;
|
|
|
+
|
|
|
+ @Mock
|
|
|
+ private RMHeartbeatHandler rmHeartbeatHandler;
|
|
|
+
|
|
|
+ @Mock
|
|
|
+ private TaskHeartbeatHandler hbHandler;
|
|
|
+
|
|
|
+ @Mock
|
|
|
+ private Dispatcher dispatcher;
|
|
|
+
|
|
|
+ @Mock
|
|
|
+ private Task task;
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ @Mock
|
|
|
+ private EventHandler<Event> ea;
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ @Captor
|
|
|
+ private ArgumentCaptor<Event> eventCaptor;
|
|
|
+
|
|
|
+ private JVMId id;
|
|
|
+ private WrappedJvmID wid;
|
|
|
+ private TaskAttemptID attemptID;
|
|
|
+ private TaskAttemptId attemptId;
|
|
|
+ private ReduceTaskStatus firstReduceStatus;
|
|
|
+ private ReduceTaskStatus secondReduceStatus;
|
|
|
+ private ReduceTaskStatus thirdReduceStatus;
|
|
|
+
|
|
|
+ private MockTaskAttemptListenerImpl listener;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Extension of the original TaskAttemptImpl
|
|
|
+ * for testing purposes
|
|
|
+ */
|
|
|
+ public static class MockTaskAttemptListenerImpl
|
|
|
+ extends TaskAttemptListenerImpl {
|
|
|
|
|
|
public MockTaskAttemptListenerImpl(AppContext context,
|
|
|
JobTokenSecretManager jobTokenSecretManager,
|
|
@@ -85,26 +154,24 @@ public class TestTaskAttemptListenerImpl {
|
|
|
//Empty
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void after() throws IOException {
|
|
|
+ if (listener != null) {
|
|
|
+ listener.close();
|
|
|
+ listener = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test (timeout=5000)
|
|
|
public void testGetTask() throws IOException {
|
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
- TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
- MockTaskAttemptListenerImpl listener =
|
|
|
- new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
- rmHeartbeatHandler, hbHandler);
|
|
|
- Configuration conf = new Configuration();
|
|
|
- listener.init(conf);
|
|
|
- listener.start();
|
|
|
- JVMId id = new JVMId("foo",1, true, 1);
|
|
|
- WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
+ configureMocks();
|
|
|
+ startListener(false);
|
|
|
|
|
|
// Verify ask before registration.
|
|
|
//The JVM ID has not been registered yet so we should kill it.
|
|
|
JvmContext context = new JvmContext();
|
|
|
+
|
|
|
context.jvmId = id;
|
|
|
JvmTask result = listener.getTask(context);
|
|
|
assertNotNull(result);
|
|
@@ -112,20 +179,18 @@ public class TestTaskAttemptListenerImpl {
|
|
|
|
|
|
// Verify ask after registration but before launch.
|
|
|
// Don't kill, should be null.
|
|
|
- TaskAttemptId attemptID = mock(TaskAttemptId.class);
|
|
|
- Task task = mock(Task.class);
|
|
|
//Now put a task with the ID
|
|
|
listener.registerPendingTask(task, wid);
|
|
|
result = listener.getTask(context);
|
|
|
assertNull(result);
|
|
|
// Unregister for more testing.
|
|
|
- listener.unregister(attemptID, wid);
|
|
|
+ listener.unregister(attemptId, wid);
|
|
|
|
|
|
// Verify ask after registration and launch
|
|
|
//Now put a task with the ID
|
|
|
listener.registerPendingTask(task, wid);
|
|
|
- listener.registerLaunchedTask(attemptID, wid);
|
|
|
- verify(hbHandler).register(attemptID);
|
|
|
+ listener.registerLaunchedTask(attemptId, wid);
|
|
|
+ verify(hbHandler).register(attemptId);
|
|
|
result = listener.getTask(context);
|
|
|
assertNotNull(result);
|
|
|
assertFalse(result.shouldDie);
|
|
@@ -136,15 +201,13 @@ public class TestTaskAttemptListenerImpl {
|
|
|
assertNotNull(result);
|
|
|
assertTrue(result.shouldDie);
|
|
|
|
|
|
- listener.unregister(attemptID, wid);
|
|
|
+ listener.unregister(attemptId, wid);
|
|
|
|
|
|
// Verify after unregistration.
|
|
|
result = listener.getTask(context);
|
|
|
assertNotNull(result);
|
|
|
assertTrue(result.shouldDie);
|
|
|
|
|
|
- listener.stop();
|
|
|
-
|
|
|
// test JVMID
|
|
|
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
|
|
|
assertNotNull(jvmid);
|
|
@@ -190,14 +253,11 @@ public class TestTaskAttemptListenerImpl {
|
|
|
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
|
|
|
TypeConverter.fromYarn(empty));
|
|
|
|
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
+ configureMocks();
|
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
- TaskAttemptListenerImpl listener =
|
|
|
- new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
|
|
|
+
|
|
|
+ listener = new MockTaskAttemptListenerImpl(
|
|
|
+ appCtx, secret, rmHeartbeatHandler, hbHandler) {
|
|
|
@Override
|
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
|
taskHeartbeatHandler = hbHandler;
|
|
@@ -238,20 +298,18 @@ public class TestTaskAttemptListenerImpl {
|
|
|
public void testCommitWindow() throws IOException {
|
|
|
SystemClock clock = SystemClock.getInstance();
|
|
|
|
|
|
+ configureMocks();
|
|
|
+
|
|
|
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
|
|
|
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
|
|
|
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
|
|
|
Job mockJob = mock(Job.class);
|
|
|
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
|
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
|
when(appCtx.getClock()).thenReturn(clock);
|
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
- TaskAttemptListenerImpl listener =
|
|
|
- new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
|
|
|
+
|
|
|
+ listener = new MockTaskAttemptListenerImpl(
|
|
|
+ appCtx, secret, rmHeartbeatHandler, hbHandler) {
|
|
|
@Override
|
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
|
taskHeartbeatHandler = hbHandler;
|
|
@@ -269,11 +327,119 @@ public class TestTaskAttemptListenerImpl {
|
|
|
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
|
|
|
|
|
|
// verify commit allowed when RM heartbeat is recent
|
|
|
- when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
|
|
|
+ when(rmHeartbeatHandler.getLastHeartbeatTime())
|
|
|
+ .thenReturn(clock.getTime());
|
|
|
canCommit = listener.canCommit(tid);
|
|
|
assertTrue(canCommit);
|
|
|
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSingleStatusUpdate()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ configureMocks();
|
|
|
+ startListener(true);
|
|
|
|
|
|
- listener.stop();
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
+
|
|
|
+ verify(ea).handle(eventCaptor.capture());
|
|
|
+ TaskAttemptStatusUpdateEvent updateEvent =
|
|
|
+ (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
|
|
|
+
|
|
|
+ TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
|
|
|
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
|
|
|
+ assertEquals(1, status.fetchFailedMaps.size());
|
|
|
+ assertEquals(Phase.SHUFFLE, status.phase);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testStatusUpdateEventCoalescing()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ configureMocks();
|
|
|
+ startListener(true);
|
|
|
+
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
+ listener.statusUpdate(attemptID, secondReduceStatus);
|
|
|
+
|
|
|
+ verify(ea).handle(any(Event.class));
|
|
|
+ ConcurrentMap<TaskAttemptId,
|
|
|
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
|
|
|
+ listener.getAttemptIdToStatus();
|
|
|
+ TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
|
|
|
+
|
|
|
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
|
|
|
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
|
|
|
+ assertEquals(2, status.fetchFailedMaps.size());
|
|
|
+ assertEquals(Phase.SORT, status.phase);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCoalescedStatusUpdatesCleared()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ // First two events are coalesced, the third is not
|
|
|
+ configureMocks();
|
|
|
+ startListener(true);
|
|
|
+
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
+ listener.statusUpdate(attemptID, secondReduceStatus);
|
|
|
+ ConcurrentMap<TaskAttemptId,
|
|
|
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
|
|
|
+ listener.getAttemptIdToStatus();
|
|
|
+ attemptIdToStatus.get(attemptId).set(null);
|
|
|
+ listener.statusUpdate(attemptID, thirdReduceStatus);
|
|
|
+
|
|
|
+ verify(ea, times(2)).handle(eventCaptor.capture());
|
|
|
+ TaskAttemptStatusUpdateEvent updateEvent =
|
|
|
+ (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
|
|
|
+
|
|
|
+ TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
|
|
|
+ assertNull(status.fetchFailedMaps);
|
|
|
+ assertEquals(Phase.REDUCE, status.phase);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(expected = IllegalStateException.class)
|
|
|
+ public void testStatusUpdateFromUnregisteredTask()
|
|
|
+ throws IOException, InterruptedException{
|
|
|
+ configureMocks();
|
|
|
+ startListener(false);
|
|
|
+
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void configureMocks() {
|
|
|
+ firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
|
|
|
+ new Counters());
|
|
|
+ firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
|
|
|
+
|
|
|
+ secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
|
|
|
+ new Counters());
|
|
|
+ secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
|
|
|
+
|
|
|
+ thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
+ TaskStatus.State.RUNNING, "", "RUNNING", "",
|
|
|
+ TaskStatus.Phase.REDUCE, new Counters());
|
|
|
+
|
|
|
+ when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
+ when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
+ listener = new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
+ rmHeartbeatHandler, hbHandler);
|
|
|
+ id = new JVMId("foo", 1, true, 1);
|
|
|
+ wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
+ attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
|
|
|
+ attemptId = TypeConverter.toYarn(attemptID);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startListener(boolean registerTask) {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ listener.init(conf);
|
|
|
+ listener.start();
|
|
|
+
|
|
|
+ if (registerTask) {
|
|
|
+ listener.registerPendingTask(task, wid);
|
|
|
+ listener.registerLaunchedTask(attemptId, wid);
|
|
|
+ }
|
|
|
}
|
|
|
}
|