|
@@ -18,7 +18,6 @@
|
|
|
package org.apache.hadoop.mapreduce.v2.app.launcher;
|
|
|
|
|
|
import static org.mockito.Matchers.any;
|
|
|
-import static org.mockito.Matchers.eq;
|
|
|
import static org.mockito.Mockito.atLeast;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.never;
|
|
@@ -26,7 +25,6 @@ import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
@@ -59,12 +57,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
|
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
-import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -88,13 +86,25 @@ public class TestContainerLauncherImpl {
|
|
|
private static class ContainerLauncherImplUnderTest extends
|
|
|
ContainerLauncherImpl {
|
|
|
|
|
|
- private YarnRPC rpc;
|
|
|
-
|
|
|
- public ContainerLauncherImplUnderTest(AppContext context, YarnRPC rpc) {
|
|
|
+ private ContainerManagementProtocol containerManager;
|
|
|
+
|
|
|
+ public ContainerLauncherImplUnderTest(AppContext context,
|
|
|
+ ContainerManagementProtocol containerManager) {
|
|
|
super(context);
|
|
|
- this.rpc = rpc;
|
|
|
+ this.containerManager = containerManager;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ContainerManagementProtocolProxyData getCMProxy(
|
|
|
+ String containerMgrBindAddr, ContainerId containerId)
|
|
|
+ throws IOException {
|
|
|
+ ContainerManagementProtocolProxyData protocolProxy =
|
|
|
+ mock(ContainerManagementProtocolProxyData.class);
|
|
|
+ when(protocolProxy.getContainerManagementProtocol()).thenReturn(
|
|
|
+ containerManager);
|
|
|
+ return protocolProxy;
|
|
|
+ }
|
|
|
+
|
|
|
public void waitForPoolToIdle() throws InterruptedException {
|
|
|
//I wish that we did not need the sleep, but it is here so that we are sure
|
|
|
// That the other thread had time to insert the event into the queue and
|
|
@@ -133,22 +143,18 @@ public class TestContainerLauncherImpl {
|
|
|
return MRBuilderUtils.newTaskAttemptId(tID, id);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout = 5000)
|
|
|
public void testHandle() throws Exception {
|
|
|
LOG.info("STARTING testHandle");
|
|
|
- YarnRPC mockRpc = mock(YarnRPC.class);
|
|
|
AppContext mockContext = mock(AppContext.class);
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
EventHandler mockEventHandler = mock(EventHandler.class);
|
|
|
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
|
|
-
|
|
|
- ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class);
|
|
|
- when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
|
|
- any(InetSocketAddress.class), any(Configuration.class)))
|
|
|
- .thenReturn(mockCM);
|
|
|
-
|
|
|
- ContainerLauncherImplUnderTest ut =
|
|
|
- new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
|
|
+ String cmAddress = "127.0.0.1:8000";
|
|
|
+ ContainerManagementProtocol mockCM =
|
|
|
+ mock(ContainerManagementProtocol.class);
|
|
|
+ ContainerLauncherImplUnderTest ut =
|
|
|
+ new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
ut.init(conf);
|
|
@@ -156,7 +162,6 @@ public class TestContainerLauncherImpl {
|
|
|
try {
|
|
|
ContainerId contId = makeContainerId(0l, 0, 0, 1);
|
|
|
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
|
|
|
- String cmAddress = "127.0.0.1:8000";
|
|
|
StartContainerResponse startResp =
|
|
|
recordFactory.newRecordInstance(StartContainerResponse.class);
|
|
|
startResp.setAllServicesMetaData(serviceResponse);
|
|
@@ -199,22 +204,18 @@ public class TestContainerLauncherImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout = 5000)
|
|
|
public void testOutOfOrder() throws Exception {
|
|
|
LOG.info("STARTING testOutOfOrder");
|
|
|
- YarnRPC mockRpc = mock(YarnRPC.class);
|
|
|
AppContext mockContext = mock(AppContext.class);
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
EventHandler mockEventHandler = mock(EventHandler.class);
|
|
|
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
|
|
|
|
|
- ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class);
|
|
|
- when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
|
|
- any(InetSocketAddress.class), any(Configuration.class)))
|
|
|
- .thenReturn(mockCM);
|
|
|
-
|
|
|
- ContainerLauncherImplUnderTest ut =
|
|
|
- new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
|
|
+ ContainerManagementProtocol mockCM =
|
|
|
+ mock(ContainerManagementProtocol.class);
|
|
|
+ ContainerLauncherImplUnderTest ut =
|
|
|
+ new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
ut.init(conf);
|
|
@@ -264,23 +265,19 @@ public class TestContainerLauncherImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout = 5000)
|
|
|
public void testMyShutdown() throws Exception {
|
|
|
LOG.info("in test Shutdown");
|
|
|
|
|
|
- YarnRPC mockRpc = mock(YarnRPC.class);
|
|
|
AppContext mockContext = mock(AppContext.class);
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
EventHandler mockEventHandler = mock(EventHandler.class);
|
|
|
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
|
|
|
|
|
- ContainerManagementProtocol mockCM = mock(ContainerManagementProtocol.class);
|
|
|
- when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
|
|
- any(InetSocketAddress.class), any(Configuration.class)))
|
|
|
- .thenReturn(mockCM);
|
|
|
-
|
|
|
+ ContainerManagementProtocol mockCM =
|
|
|
+ mock(ContainerManagementProtocol.class);
|
|
|
ContainerLauncherImplUnderTest ut =
|
|
|
- new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
|
|
+ new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
ut.init(conf);
|
|
@@ -320,26 +317,22 @@ public class TestContainerLauncherImpl {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
- @Test
|
|
|
+ @Test(timeout = 5000)
|
|
|
public void testContainerCleaned() throws Exception {
|
|
|
LOG.info("STARTING testContainerCleaned");
|
|
|
|
|
|
CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
|
|
|
CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
|
|
|
|
|
|
- YarnRPC mockRpc = mock(YarnRPC.class);
|
|
|
AppContext mockContext = mock(AppContext.class);
|
|
|
|
|
|
EventHandler mockEventHandler = mock(EventHandler.class);
|
|
|
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
|
|
|
|
|
|
- ContainerManagementProtocol mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
|
|
|
- when(mockRpc.getProxy(eq(ContainerManagementProtocol.class),
|
|
|
- any(InetSocketAddress.class), any(Configuration.class)))
|
|
|
- .thenReturn(mockCM);
|
|
|
-
|
|
|
- ContainerLauncherImplUnderTest ut =
|
|
|
- new ContainerLauncherImplUnderTest(mockContext, mockRpc);
|
|
|
+ ContainerManagementProtocol mockCM =
|
|
|
+ new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
|
|
|
+ ContainerLauncherImplUnderTest ut =
|
|
|
+ new ContainerLauncherImplUnderTest(mockContext, mockCM);
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
ut.init(conf);
|