|
@@ -31,15 +31,14 @@ import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.concurrent.ConcurrentMap;
|
|
|
-import java.util.concurrent.atomic.AtomicReference;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
@@ -47,83 +46,15 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
-import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
-import org.apache.hadoop.yarn.event.Event;
|
|
|
-import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
-import org.junit.runner.RunWith;
|
|
|
-import org.mockito.ArgumentCaptor;
|
|
|
-import org.mockito.Captor;
|
|
|
-import org.mockito.Mock;
|
|
|
-import org.mockito.runners.MockitoJUnitRunner;
|
|
|
-
|
|
|
|
|
|
-/**
|
|
|
- * Tests the behavior of TaskAttemptListenerImpl.
|
|
|
- */
|
|
|
-@RunWith(MockitoJUnitRunner.class)
|
|
|
public class TestTaskAttemptListenerImpl {
|
|
|
- private static final String ATTEMPT1_ID =
|
|
|
- "attempt_123456789012_0001_m_000001_0";
|
|
|
- private static final String ATTEMPT2_ID =
|
|
|
- "attempt_123456789012_0001_m_000002_0";
|
|
|
-
|
|
|
- private static final TaskAttemptId TASKATTEMPTID1 =
|
|
|
- TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
|
|
|
- private static final TaskAttemptId TASKATTEMPTID2 =
|
|
|
- TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
|
|
|
-
|
|
|
- @Mock
|
|
|
- private AppContext appCtx;
|
|
|
-
|
|
|
- @Mock
|
|
|
- private JobTokenSecretManager secret;
|
|
|
-
|
|
|
- @Mock
|
|
|
- private RMHeartbeatHandler rmHeartbeatHandler;
|
|
|
-
|
|
|
- @Mock
|
|
|
- private TaskHeartbeatHandler hbHandler;
|
|
|
-
|
|
|
- @Mock
|
|
|
- private Dispatcher dispatcher;
|
|
|
-
|
|
|
- @Mock
|
|
|
- private Task task;
|
|
|
-
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
- @Mock
|
|
|
- private EventHandler<Event> ea;
|
|
|
-
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
- @Captor
|
|
|
- private ArgumentCaptor<Event> eventCaptor;
|
|
|
-
|
|
|
- private JVMId id;
|
|
|
- private WrappedJvmID wid;
|
|
|
- private TaskAttemptID attemptID;
|
|
|
- private TaskAttemptId attemptId;
|
|
|
- private ReduceTaskStatus firstReduceStatus;
|
|
|
- private ReduceTaskStatus secondReduceStatus;
|
|
|
- private ReduceTaskStatus thirdReduceStatus;
|
|
|
-
|
|
|
- private MockTaskAttemptListenerImpl listener;
|
|
|
-
|
|
|
- /**
|
|
|
- * Extension of the original TaskAttemptImpl
|
|
|
- * for testing purposes
|
|
|
- */
|
|
|
- public static class MockTaskAttemptListenerImpl
|
|
|
- extends TaskAttemptListenerImpl {
|
|
|
+ public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
|
|
|
|
|
|
public MockTaskAttemptListenerImpl(AppContext context,
|
|
|
JobTokenSecretManager jobTokenSecretManager,
|
|
@@ -154,24 +85,26 @@ public class TestTaskAttemptListenerImpl {
|
|
|
//Empty
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @After
|
|
|
- public void after() throws IOException {
|
|
|
- if (listener != null) {
|
|
|
- listener.close();
|
|
|
- listener = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=5000)
|
|
|
public void testGetTask() throws IOException {
|
|
|
- configureMocks();
|
|
|
- startListener(false);
|
|
|
+ AppContext appCtx = mock(AppContext.class);
|
|
|
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
+ RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
+ mock(RMHeartbeatHandler.class);
|
|
|
+ TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
+ MockTaskAttemptListenerImpl listener =
|
|
|
+ new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
+ rmHeartbeatHandler, hbHandler);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ listener.init(conf);
|
|
|
+ listener.start();
|
|
|
+ JVMId id = new JVMId("foo",1, true, 1);
|
|
|
+ WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
|
|
|
// Verify ask before registration.
|
|
|
//The JVM ID has not been registered yet so we should kill it.
|
|
|
JvmContext context = new JvmContext();
|
|
|
-
|
|
|
context.jvmId = id;
|
|
|
JvmTask result = listener.getTask(context);
|
|
|
assertNotNull(result);
|
|
@@ -179,18 +112,20 @@ public class TestTaskAttemptListenerImpl {
|
|
|
|
|
|
// Verify ask after registration but before launch.
|
|
|
// Don't kill, should be null.
|
|
|
+ TaskAttemptId attemptID = mock(TaskAttemptId.class);
|
|
|
+ Task task = mock(Task.class);
|
|
|
//Now put a task with the ID
|
|
|
listener.registerPendingTask(task, wid);
|
|
|
result = listener.getTask(context);
|
|
|
assertNull(result);
|
|
|
// Unregister for more testing.
|
|
|
- listener.unregister(attemptId, wid);
|
|
|
+ listener.unregister(attemptID, wid);
|
|
|
|
|
|
// Verify ask after registration and launch
|
|
|
//Now put a task with the ID
|
|
|
listener.registerPendingTask(task, wid);
|
|
|
- listener.registerLaunchedTask(attemptId, wid);
|
|
|
- verify(hbHandler).register(attemptId);
|
|
|
+ listener.registerLaunchedTask(attemptID, wid);
|
|
|
+ verify(hbHandler).register(attemptID);
|
|
|
result = listener.getTask(context);
|
|
|
assertNotNull(result);
|
|
|
assertFalse(result.shouldDie);
|
|
@@ -201,13 +136,15 @@ public class TestTaskAttemptListenerImpl {
|
|
|
assertNotNull(result);
|
|
|
assertTrue(result.shouldDie);
|
|
|
|
|
|
- listener.unregister(attemptId, wid);
|
|
|
+ listener.unregister(attemptID, wid);
|
|
|
|
|
|
// Verify after unregistration.
|
|
|
result = listener.getTask(context);
|
|
|
assertNotNull(result);
|
|
|
assertTrue(result.shouldDie);
|
|
|
|
|
|
+ listener.stop();
|
|
|
+
|
|
|
// test JVMID
|
|
|
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
|
|
|
assertNotNull(jvmid);
|
|
@@ -253,11 +190,14 @@ public class TestTaskAttemptListenerImpl {
|
|
|
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
|
|
|
TypeConverter.fromYarn(empty));
|
|
|
|
|
|
- configureMocks();
|
|
|
+ AppContext appCtx = mock(AppContext.class);
|
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
|
-
|
|
|
- listener = new MockTaskAttemptListenerImpl(
|
|
|
- appCtx, secret, rmHeartbeatHandler, hbHandler) {
|
|
|
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
+ RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
+ mock(RMHeartbeatHandler.class);
|
|
|
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
+ TaskAttemptListenerImpl listener =
|
|
|
+ new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
|
|
|
@Override
|
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
|
taskHeartbeatHandler = hbHandler;
|
|
@@ -298,18 +238,20 @@ public class TestTaskAttemptListenerImpl {
|
|
|
public void testCommitWindow() throws IOException {
|
|
|
SystemClock clock = new SystemClock();
|
|
|
|
|
|
- configureMocks();
|
|
|
-
|
|
|
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
|
|
|
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
|
|
|
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
|
|
|
Job mockJob = mock(Job.class);
|
|
|
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
|
|
|
+ AppContext appCtx = mock(AppContext.class);
|
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
|
when(appCtx.getClock()).thenReturn(clock);
|
|
|
-
|
|
|
- listener = new MockTaskAttemptListenerImpl(
|
|
|
- appCtx, secret, rmHeartbeatHandler, hbHandler) {
|
|
|
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
+ RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
+ mock(RMHeartbeatHandler.class);
|
|
|
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
+ TaskAttemptListenerImpl listener =
|
|
|
+ new MockTaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
|
|
|
@Override
|
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
|
taskHeartbeatHandler = hbHandler;
|
|
@@ -327,119 +269,11 @@ public class TestTaskAttemptListenerImpl {
|
|
|
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
|
|
|
|
|
|
// verify commit allowed when RM heartbeat is recent
|
|
|
- when(rmHeartbeatHandler.getLastHeartbeatTime())
|
|
|
- .thenReturn(clock.getTime());
|
|
|
+ when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
|
|
|
canCommit = listener.canCommit(tid);
|
|
|
assertTrue(canCommit);
|
|
|
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testSingleStatusUpdate()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- configureMocks();
|
|
|
- startListener(true);
|
|
|
|
|
|
- listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
-
|
|
|
- verify(ea).handle(eventCaptor.capture());
|
|
|
- TaskAttemptStatusUpdateEvent updateEvent =
|
|
|
- (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
|
|
|
-
|
|
|
- TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
|
|
|
- assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
|
|
|
- assertEquals(1, status.fetchFailedMaps.size());
|
|
|
- assertEquals(Phase.SHUFFLE, status.phase);
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testStatusUpdateEventCoalescing()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- configureMocks();
|
|
|
- startListener(true);
|
|
|
-
|
|
|
- listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
- listener.statusUpdate(attemptID, secondReduceStatus);
|
|
|
-
|
|
|
- verify(ea).handle(any(Event.class));
|
|
|
- ConcurrentMap<TaskAttemptId,
|
|
|
- AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
|
|
|
- listener.getAttemptIdToStatus();
|
|
|
- TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
|
|
|
-
|
|
|
- assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
|
|
|
- assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
|
|
|
- assertEquals(2, status.fetchFailedMaps.size());
|
|
|
- assertEquals(Phase.SORT, status.phase);
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testCoalescedStatusUpdatesCleared()
|
|
|
- throws IOException, InterruptedException {
|
|
|
- // First two events are coalesced, the third is not
|
|
|
- configureMocks();
|
|
|
- startListener(true);
|
|
|
-
|
|
|
- listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
- listener.statusUpdate(attemptID, secondReduceStatus);
|
|
|
- ConcurrentMap<TaskAttemptId,
|
|
|
- AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
|
|
|
- listener.getAttemptIdToStatus();
|
|
|
- attemptIdToStatus.get(attemptId).set(null);
|
|
|
- listener.statusUpdate(attemptID, thirdReduceStatus);
|
|
|
-
|
|
|
- verify(ea, times(2)).handle(eventCaptor.capture());
|
|
|
- TaskAttemptStatusUpdateEvent updateEvent =
|
|
|
- (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
|
|
|
-
|
|
|
- TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
|
|
|
- assertNull(status.fetchFailedMaps);
|
|
|
- assertEquals(Phase.REDUCE, status.phase);
|
|
|
- }
|
|
|
-
|
|
|
- @Test(expected = IllegalStateException.class)
|
|
|
- public void testStatusUpdateFromUnregisteredTask()
|
|
|
- throws IOException, InterruptedException{
|
|
|
- configureMocks();
|
|
|
- startListener(false);
|
|
|
-
|
|
|
- listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
- }
|
|
|
-
|
|
|
- private void configureMocks() {
|
|
|
- firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
- TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
|
|
|
- new Counters());
|
|
|
- firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
|
|
|
-
|
|
|
- secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
- TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
|
|
|
- new Counters());
|
|
|
- secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
|
|
|
-
|
|
|
- thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
- TaskStatus.State.RUNNING, "", "RUNNING", "",
|
|
|
- TaskStatus.Phase.REDUCE, new Counters());
|
|
|
-
|
|
|
- when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
- listener = new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
- rmHeartbeatHandler, hbHandler);
|
|
|
- id = new JVMId("foo", 1, true, 1);
|
|
|
- wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
- attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
|
|
|
- attemptId = TypeConverter.toYarn(attemptID);
|
|
|
- }
|
|
|
-
|
|
|
- private void startListener(boolean registerTask) {
|
|
|
- Configuration conf = new Configuration();
|
|
|
-
|
|
|
- listener.init(conf);
|
|
|
- listener.start();
|
|
|
-
|
|
|
- if (registerTask) {
|
|
|
- listener.registerPendingTask(task, wid);
|
|
|
- listener.registerLaunchedTask(attemptId, wid);
|
|
|
- }
|
|
|
+ listener.stop();
|
|
|
}
|
|
|
}
|