|
@@ -24,6 +24,7 @@ import static org.mockito.Matchers.anyInt;
|
|
|
import static org.mockito.Matchers.isA;
|
|
|
import static org.mockito.Mockito.doCallRealMethod;
|
|
|
import static org.mockito.Mockito.doReturn;
|
|
|
+import static org.mockito.Mockito.inOrder;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.times;
|
|
@@ -59,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
@@ -69,6 +71,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
|
@@ -142,6 +145,7 @@ import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
+import org.mockito.InOrder;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public class TestRMContainerAllocator {
|
|
@@ -3011,6 +3015,47 @@ public class TestRMContainerAllocator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * MAPREDUCE-6771. Test if RMContainerAllocator generates the events in the
|
|
|
+ * right order while processing finished containers.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testHandlingFinishedContainers() {
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
+
|
|
|
+ AppContext context = mock(MRAppMaster.RunningAppContext.class);
|
|
|
+ when(context.getClock()).thenReturn(new ControlledClock());
|
|
|
+ when(context.getClusterInfo()).thenReturn(
|
|
|
+ new ClusterInfo(Resource.newInstance(10240, 1)));
|
|
|
+ when(context.getEventHandler()).thenReturn(eventHandler);
|
|
|
+ RMContainerAllocator containerAllocator =
|
|
|
+ new RMContainerAllocatorForFinishedContainer(null, context);
|
|
|
+
|
|
|
+ ContainerStatus finishedContainer = ContainerStatus.newInstance(
|
|
|
+ mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
|
|
|
+ containerAllocator.processFinishedContainer(finishedContainer);
|
|
|
+
|
|
|
+ InOrder inOrder = inOrder(eventHandler);
|
|
|
+ inOrder.verify(eventHandler).handle(
|
|
|
+ isA(TaskAttemptDiagnosticsUpdateEvent.class));
|
|
|
+ inOrder.verify(eventHandler).handle(isA(TaskAttemptEvent.class));
|
|
|
+ inOrder.verifyNoMoreInteractions();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class RMContainerAllocatorForFinishedContainer
|
|
|
+ extends RMContainerAllocator {
|
|
|
+ public RMContainerAllocatorForFinishedContainer(ClientService clientService,
|
|
|
+ AppContext context) {
|
|
|
+ super(clientService, context);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ protected AssignedRequests createAssignedRequests() {
|
|
|
+ AssignedRequests assignedReqs = mock(AssignedRequests.class);
|
|
|
+ TaskAttemptId taskAttempt = mock(TaskAttemptId.class);
|
|
|
+ when(assignedReqs.get(any(ContainerId.class))).thenReturn(taskAttempt);
|
|
|
+ return assignedReqs;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Test
|
|
|
public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()
|