|
@@ -24,6 +24,8 @@ import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -35,6 +37,7 @@ import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
|
|
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
|
|
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
@@ -42,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|
@@ -52,12 +57,69 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
-
|
|
|
|
|
|
+import org.junit.After;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.junit.runner.RunWith;
|
|
|
|
+import org.mockito.ArgumentCaptor;
|
|
|
|
+import org.mockito.Captor;
|
|
|
|
+import org.mockito.Mock;
|
|
|
|
+import org.mockito.runners.MockitoJUnitRunner;
|
|
|
|
+
|
|
import static org.junit.Assert.*;
|
|
import static org.junit.Assert.*;
|
|
import static org.mockito.Mockito.*;
|
|
import static org.mockito.Mockito.*;
|
|
|
|
|
|
|
|
+/**
|
|
|
|
+ * Tests the behavior of TaskAttemptListenerImpl.
|
|
|
|
+ */
|
|
|
|
+@RunWith(MockitoJUnitRunner.class)
|
|
public class TestTaskAttemptListenerImpl {
|
|
public class TestTaskAttemptListenerImpl {
|
|
|
|
+ private static final String ATTEMPT1_ID =
|
|
|
|
+ "attempt_123456789012_0001_m_000001_0";
|
|
|
|
+ private static final String ATTEMPT2_ID =
|
|
|
|
+ "attempt_123456789012_0001_m_000002_0";
|
|
|
|
+
|
|
|
|
+ private static final TaskAttemptId TASKATTEMPTID1 =
|
|
|
|
+ TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT1_ID));
|
|
|
|
+ private static final TaskAttemptId TASKATTEMPTID2 =
|
|
|
|
+ TypeConverter.toYarn(TaskAttemptID.forName(ATTEMPT2_ID));
|
|
|
|
+
|
|
|
|
+ @Mock
|
|
|
|
+ private AppContext appCtx;
|
|
|
|
+
|
|
|
|
+ @Mock
|
|
|
|
+ private JobTokenSecretManager secret;
|
|
|
|
+
|
|
|
|
+ @Mock
|
|
|
|
+ private RMHeartbeatHandler rmHeartbeatHandler;
|
|
|
|
+
|
|
|
|
+ @Mock
|
|
|
|
+ private TaskHeartbeatHandler hbHandler;
|
|
|
|
+
|
|
|
|
+ @Mock
|
|
|
|
+ private Dispatcher dispatcher;
|
|
|
|
+
|
|
|
|
+ @Mock
|
|
|
|
+ private Task task;
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
|
+ @Mock
|
|
|
|
+ private EventHandler<Event> ea;
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
|
+ @Captor
|
|
|
|
+ private ArgumentCaptor<Event> eventCaptor;
|
|
|
|
+
|
|
|
|
+ private CheckpointAMPreemptionPolicy policy;
|
|
|
|
+ private JVMId id;
|
|
|
|
+ private WrappedJvmID wid;
|
|
|
|
+ private TaskAttemptID attemptID;
|
|
|
|
+ private TaskAttemptId attemptId;
|
|
|
|
+ private ReduceTaskStatus firstReduceStatus;
|
|
|
|
+ private ReduceTaskStatus secondReduceStatus;
|
|
|
|
+ private ReduceTaskStatus thirdReduceStatus;
|
|
|
|
+
|
|
|
|
+ private MockTaskAttemptListenerImpl listener;
|
|
|
|
+
|
|
public static class MockTaskAttemptListenerImpl
|
|
public static class MockTaskAttemptListenerImpl
|
|
extends TaskAttemptListenerImpl {
|
|
extends TaskAttemptListenerImpl {
|
|
|
|
|
|
@@ -93,34 +155,24 @@ public class TestTaskAttemptListenerImpl {
|
|
//Empty
|
|
//Empty
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ @After
|
|
|
|
+ public void after() throws IOException {
|
|
|
|
+ if (listener != null) {
|
|
|
|
+ listener.close();
|
|
|
|
+ listener = null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test (timeout=5000)
|
|
@Test (timeout=5000)
|
|
public void testGetTask() throws IOException {
|
|
public void testGetTask() throws IOException {
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
|
- TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
|
- Dispatcher dispatcher = mock(Dispatcher.class);
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- EventHandler<Event> ea = mock(EventHandler.class);
|
|
|
|
- when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
|
-
|
|
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
|
|
|
|
- policy.init(appCtx);
|
|
|
|
- MockTaskAttemptListenerImpl listener =
|
|
|
|
- new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
|
- rmHeartbeatHandler, hbHandler, policy);
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
|
- listener.init(conf);
|
|
|
|
- listener.start();
|
|
|
|
- JVMId id = new JVMId("foo",1, true, 1);
|
|
|
|
- WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
|
|
|
+ configureMocks();
|
|
|
|
+ startListener(false);
|
|
|
|
|
|
// Verify ask before registration.
|
|
// Verify ask before registration.
|
|
//The JVM ID has not been registered yet so we should kill it.
|
|
//The JVM ID has not been registered yet so we should kill it.
|
|
JvmContext context = new JvmContext();
|
|
JvmContext context = new JvmContext();
|
|
|
|
+
|
|
context.jvmId = id;
|
|
context.jvmId = id;
|
|
JvmTask result = listener.getTask(context);
|
|
JvmTask result = listener.getTask(context);
|
|
assertNotNull(result);
|
|
assertNotNull(result);
|
|
@@ -128,20 +180,18 @@ public class TestTaskAttemptListenerImpl {
|
|
|
|
|
|
// Verify ask after registration but before launch.
|
|
// Verify ask after registration but before launch.
|
|
// Don't kill, should be null.
|
|
// Don't kill, should be null.
|
|
- TaskAttemptId attemptID = mock(TaskAttemptId.class);
|
|
|
|
- Task task = mock(Task.class);
|
|
|
|
//Now put a task with the ID
|
|
//Now put a task with the ID
|
|
listener.registerPendingTask(task, wid);
|
|
listener.registerPendingTask(task, wid);
|
|
result = listener.getTask(context);
|
|
result = listener.getTask(context);
|
|
assertNull(result);
|
|
assertNull(result);
|
|
// Unregister for more testing.
|
|
// Unregister for more testing.
|
|
- listener.unregister(attemptID, wid);
|
|
|
|
|
|
+ listener.unregister(attemptId, wid);
|
|
|
|
|
|
// Verify ask after registration and launch
|
|
// Verify ask after registration and launch
|
|
//Now put a task with the ID
|
|
//Now put a task with the ID
|
|
listener.registerPendingTask(task, wid);
|
|
listener.registerPendingTask(task, wid);
|
|
- listener.registerLaunchedTask(attemptID, wid);
|
|
|
|
- verify(hbHandler).register(attemptID);
|
|
|
|
|
|
+ listener.registerLaunchedTask(attemptId, wid);
|
|
|
|
+ verify(hbHandler).register(attemptId);
|
|
result = listener.getTask(context);
|
|
result = listener.getTask(context);
|
|
assertNotNull(result);
|
|
assertNotNull(result);
|
|
assertFalse(result.shouldDie);
|
|
assertFalse(result.shouldDie);
|
|
@@ -152,15 +202,13 @@ public class TestTaskAttemptListenerImpl {
|
|
assertNotNull(result);
|
|
assertNotNull(result);
|
|
assertTrue(result.shouldDie);
|
|
assertTrue(result.shouldDie);
|
|
|
|
|
|
- listener.unregister(attemptID, wid);
|
|
|
|
|
|
+ listener.unregister(attemptId, wid);
|
|
|
|
|
|
// Verify after unregistration.
|
|
// Verify after unregistration.
|
|
result = listener.getTask(context);
|
|
result = listener.getTask(context);
|
|
assertNotNull(result);
|
|
assertNotNull(result);
|
|
assertTrue(result.shouldDie);
|
|
assertTrue(result.shouldDie);
|
|
|
|
|
|
- listener.stop();
|
|
|
|
-
|
|
|
|
// test JVMID
|
|
// test JVMID
|
|
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
|
|
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
|
|
assertNotNull(jvmid);
|
|
assertNotNull(jvmid);
|
|
@@ -206,20 +254,10 @@ public class TestTaskAttemptListenerImpl {
|
|
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
|
|
when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
|
|
TypeConverter.fromYarn(empty));
|
|
TypeConverter.fromYarn(empty));
|
|
|
|
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
|
|
|
+ configureMocks();
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
|
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
|
- Dispatcher dispatcher = mock(Dispatcher.class);
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- EventHandler<Event> ea = mock(EventHandler.class);
|
|
|
|
- when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
|
|
|
|
- policy.init(appCtx);
|
|
|
|
- TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
|
|
|
|
|
|
+
|
|
|
|
+ listener = new MockTaskAttemptListenerImpl(
|
|
appCtx, secret, rmHeartbeatHandler, policy) {
|
|
appCtx, secret, rmHeartbeatHandler, policy) {
|
|
@Override
|
|
@Override
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
@@ -262,26 +300,17 @@ public class TestTaskAttemptListenerImpl {
|
|
public void testCommitWindow() throws IOException {
|
|
public void testCommitWindow() throws IOException {
|
|
SystemClock clock = SystemClock.getInstance();
|
|
SystemClock clock = SystemClock.getInstance();
|
|
|
|
|
|
|
|
+ configureMocks();
|
|
|
|
+
|
|
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
|
|
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
|
|
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
|
|
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
|
|
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
|
|
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
|
|
Job mockJob = mock(Job.class);
|
|
Job mockJob = mock(Job.class);
|
|
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
|
|
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
when(appCtx.getClock()).thenReturn(clock);
|
|
when(appCtx.getClock()).thenReturn(clock);
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
|
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
|
- Dispatcher dispatcher = mock(Dispatcher.class);
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- EventHandler<Event> ea = mock(EventHandler.class);
|
|
|
|
- when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
|
|
|
|
- policy.init(appCtx);
|
|
|
|
- TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
|
|
|
|
|
|
+
|
|
|
|
+ listener = new MockTaskAttemptListenerImpl(
|
|
appCtx, secret, rmHeartbeatHandler, policy) {
|
|
appCtx, secret, rmHeartbeatHandler, policy) {
|
|
@Override
|
|
@Override
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
@@ -300,44 +329,29 @@ public class TestTaskAttemptListenerImpl {
|
|
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
|
|
verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
|
|
|
|
|
|
// verify commit allowed when RM heartbeat is recent
|
|
// verify commit allowed when RM heartbeat is recent
|
|
- when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
|
|
|
|
|
|
+ when(rmHeartbeatHandler.getLastHeartbeatTime())
|
|
|
|
+ .thenReturn(clock.getTime());
|
|
canCommit = listener.canCommit(tid);
|
|
canCommit = listener.canCommit(tid);
|
|
assertTrue(canCommit);
|
|
assertTrue(canCommit);
|
|
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
|
|
verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
|
|
-
|
|
|
|
- listener.stop();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testCheckpointIDTracking()
|
|
public void testCheckpointIDTracking()
|
|
throws IOException, InterruptedException{
|
|
throws IOException, InterruptedException{
|
|
-
|
|
|
|
SystemClock clock = SystemClock.getInstance();
|
|
SystemClock clock = SystemClock.getInstance();
|
|
|
|
|
|
|
|
+ configureMocks();
|
|
|
|
+
|
|
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
|
|
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
|
|
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
|
|
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
|
|
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
|
|
when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
|
|
Job mockJob = mock(Job.class);
|
|
Job mockJob = mock(Job.class);
|
|
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
|
|
when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
|
|
-
|
|
|
|
- Dispatcher dispatcher = mock(Dispatcher.class);
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- EventHandler<Event> ea = mock(EventHandler.class);
|
|
|
|
- when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
|
-
|
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
|
-
|
|
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
|
|
when(appCtx.getClock()).thenReturn(clock);
|
|
when(appCtx.getClock()).thenReturn(clock);
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
|
- final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
|
|
|
|
- policy.init(appCtx);
|
|
|
|
- TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
|
|
|
|
|
|
+
|
|
|
|
+ listener = new MockTaskAttemptListenerImpl(
|
|
appCtx, secret, rmHeartbeatHandler, policy) {
|
|
appCtx, secret, rmHeartbeatHandler, policy) {
|
|
@Override
|
|
@Override
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
protected void registerHeartbeatHandler(Configuration conf) {
|
|
@@ -387,42 +401,13 @@ public class TestTaskAttemptListenerImpl {
|
|
|
|
|
|
//assert it worked
|
|
//assert it worked
|
|
assert outcid == incid;
|
|
assert outcid == incid;
|
|
-
|
|
|
|
- listener.stop();
|
|
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
|
@Test
|
|
@Test
|
|
public void testStatusUpdateProgress()
|
|
public void testStatusUpdateProgress()
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
- AppContext appCtx = mock(AppContext.class);
|
|
|
|
- JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
|
|
|
|
- RMHeartbeatHandler rmHeartbeatHandler =
|
|
|
|
- mock(RMHeartbeatHandler.class);
|
|
|
|
- TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
|
|
|
|
- Dispatcher dispatcher = mock(Dispatcher.class);
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
|
- EventHandler<Event> ea = mock(EventHandler.class);
|
|
|
|
- when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
|
-
|
|
|
|
- when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
- CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
|
|
|
|
- policy.init(appCtx);
|
|
|
|
- MockTaskAttemptListenerImpl listener =
|
|
|
|
- new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
|
- rmHeartbeatHandler, hbHandler, policy);
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
|
- listener.init(conf);
|
|
|
|
- listener.start();
|
|
|
|
- JVMId id = new JVMId("foo",1, true, 1);
|
|
|
|
- WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
|
-
|
|
|
|
- TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
|
|
|
|
- TaskAttemptId attemptId = TypeConverter.toYarn(attemptID);
|
|
|
|
- Task task = mock(Task.class);
|
|
|
|
- listener.registerPendingTask(task, wid);
|
|
|
|
- listener.registerLaunchedTask(attemptId, wid);
|
|
|
|
|
|
+ configureMocks();
|
|
|
|
+ startListener(true);
|
|
verify(hbHandler).register(attemptId);
|
|
verify(hbHandler).register(attemptId);
|
|
|
|
|
|
// make sure a ping doesn't report progress
|
|
// make sure a ping doesn't report progress
|
|
@@ -437,6 +422,116 @@ public class TestTaskAttemptListenerImpl {
|
|
feedback = listener.statusUpdate(attemptID, mockStatus);
|
|
feedback = listener.statusUpdate(attemptID, mockStatus);
|
|
assertTrue(feedback.getTaskFound());
|
|
assertTrue(feedback.getTaskFound());
|
|
verify(hbHandler).progressing(eq(attemptId));
|
|
verify(hbHandler).progressing(eq(attemptId));
|
|
- listener.close();
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testSingleStatusUpdate()
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ configureMocks();
|
|
|
|
+ startListener(true);
|
|
|
|
+
|
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
|
+
|
|
|
|
+ verify(ea).handle(eventCaptor.capture());
|
|
|
|
+ TaskAttemptStatusUpdateEvent updateEvent =
|
|
|
|
+ (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
|
|
|
|
+
|
|
|
|
+ TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
|
|
|
|
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
|
|
|
|
+ assertEquals(1, status.fetchFailedMaps.size());
|
|
|
|
+ assertEquals(Phase.SHUFFLE, status.phase);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testStatusUpdateEventCoalescing()
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ configureMocks();
|
|
|
|
+ startListener(true);
|
|
|
|
+
|
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
|
+ listener.statusUpdate(attemptID, secondReduceStatus);
|
|
|
|
+
|
|
|
|
+ verify(ea).handle(any(Event.class));
|
|
|
|
+ ConcurrentMap<TaskAttemptId,
|
|
|
|
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
|
|
|
|
+ listener.getAttemptIdToStatus();
|
|
|
|
+ TaskAttemptStatus status = attemptIdToStatus.get(attemptId).get();
|
|
|
|
+
|
|
|
|
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID1));
|
|
|
|
+ assertTrue(status.fetchFailedMaps.contains(TASKATTEMPTID2));
|
|
|
|
+ assertEquals(2, status.fetchFailedMaps.size());
|
|
|
|
+ assertEquals(Phase.SORT, status.phase);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testCoalescedStatusUpdatesCleared()
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ // First two events are coalesced, the third is not
|
|
|
|
+ configureMocks();
|
|
|
|
+ startListener(true);
|
|
|
|
+
|
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
|
+ listener.statusUpdate(attemptID, secondReduceStatus);
|
|
|
|
+ ConcurrentMap<TaskAttemptId,
|
|
|
|
+ AtomicReference<TaskAttemptStatus>> attemptIdToStatus =
|
|
|
|
+ listener.getAttemptIdToStatus();
|
|
|
|
+ attemptIdToStatus.get(attemptId).set(null);
|
|
|
|
+ listener.statusUpdate(attemptID, thirdReduceStatus);
|
|
|
|
+
|
|
|
|
+ verify(ea, times(2)).handle(eventCaptor.capture());
|
|
|
|
+ TaskAttemptStatusUpdateEvent updateEvent =
|
|
|
|
+ (TaskAttemptStatusUpdateEvent) eventCaptor.getValue();
|
|
|
|
+
|
|
|
|
+ TaskAttemptStatus status = updateEvent.getTaskAttemptStatusRef().get();
|
|
|
|
+ assertNull(status.fetchFailedMaps);
|
|
|
|
+ assertEquals(Phase.REDUCE, status.phase);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(expected = IllegalStateException.class)
|
|
|
|
+ public void testStatusUpdateFromUnregisteredTask()
|
|
|
|
+ throws IOException, InterruptedException{
|
|
|
|
+ configureMocks();
|
|
|
|
+ startListener(false);
|
|
|
|
+
|
|
|
|
+ listener.statusUpdate(attemptID, firstReduceStatus);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void configureMocks() {
|
|
|
|
+ firstReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
|
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SHUFFLE,
|
|
|
|
+ new Counters());
|
|
|
|
+ firstReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT1_ID));
|
|
|
|
+
|
|
|
|
+ secondReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
|
+ TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.SORT,
|
|
|
|
+ new Counters());
|
|
|
|
+ secondReduceStatus.addFetchFailedMap(TaskAttemptID.forName(ATTEMPT2_ID));
|
|
|
|
+
|
|
|
|
+ thirdReduceStatus = new ReduceTaskStatus(attemptID, 0.0f, 1,
|
|
|
|
+ TaskStatus.State.RUNNING, "", "RUNNING", "",
|
|
|
|
+ TaskStatus.Phase.REDUCE, new Counters());
|
|
|
|
+
|
|
|
|
+ when(dispatcher.getEventHandler()).thenReturn(ea);
|
|
|
|
+ when(appCtx.getEventHandler()).thenReturn(ea);
|
|
|
|
+ policy = new CheckpointAMPreemptionPolicy();
|
|
|
|
+ policy.init(appCtx);
|
|
|
|
+ listener = new MockTaskAttemptListenerImpl(appCtx, secret,
|
|
|
|
+ rmHeartbeatHandler, hbHandler, policy);
|
|
|
|
+ id = new JVMId("foo", 1, true, 1);
|
|
|
|
+ wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
|
|
|
|
+ attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
|
|
|
|
+ attemptId = TypeConverter.toYarn(attemptID);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void startListener(boolean registerTask) {
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+
|
|
|
|
+ listener.init(conf);
|
|
|
|
+ listener.start();
|
|
|
|
+
|
|
|
|
+ if (registerTask) {
|
|
|
|
+ listener.registerPendingTask(task, wid);
|
|
|
|
+ listener.registerLaunchedTask(attemptId, wid);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|