|
@@ -55,12 +55,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerToken;
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
|
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
@@ -172,6 +174,8 @@ public class TestContainerLauncherImpl {
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
|
|
|
+ when(mockLaunchEvent.getContainerToken()).thenReturn(
|
|
|
|
+ createNewContainerToken(contId, cmAddress));
|
|
ut.handle(mockLaunchEvent);
|
|
ut.handle(mockLaunchEvent);
|
|
|
|
|
|
ut.waitForPoolToIdle();
|
|
ut.waitForPoolToIdle();
|
|
@@ -250,6 +254,8 @@ public class TestContainerLauncherImpl {
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
|
|
|
+ when(mockLaunchEvent.getContainerToken()).thenReturn(
|
|
|
|
+ createNewContainerToken(contId, cmAddress));
|
|
ut.handle(mockLaunchEvent);
|
|
ut.handle(mockLaunchEvent);
|
|
|
|
|
|
ut.waitForPoolToIdle();
|
|
ut.waitForPoolToIdle();
|
|
@@ -299,6 +305,8 @@ public class TestContainerLauncherImpl {
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
|
when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
|
|
|
|
+ when(mockLaunchEvent.getContainerToken()).thenReturn(
|
|
|
|
+ createNewContainerToken(contId, cmAddress));
|
|
ut.handle(mockLaunchEvent);
|
|
ut.handle(mockLaunchEvent);
|
|
|
|
|
|
ut.waitForPoolToIdle();
|
|
ut.waitForPoolToIdle();
|
|
@@ -356,6 +364,8 @@ public class TestContainerLauncherImpl {
|
|
.thenReturn(contId);
|
|
.thenReturn(contId);
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
|
|
|
|
+ when(mockLaunchEvent.getContainerToken()).thenReturn(
|
|
|
|
+ createNewContainerToken(contId, cmAddress));
|
|
ut.handle(mockLaunchEvent);
|
|
ut.handle(mockLaunchEvent);
|
|
|
|
|
|
startLaunchBarrier.await();
|
|
startLaunchBarrier.await();
|
|
@@ -394,6 +404,15 @@ public class TestContainerLauncherImpl {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private ContainerToken createNewContainerToken(ContainerId contId,
|
|
|
|
+ String containerManagerAddr) {
|
|
|
|
+ return BuilderUtils.newContainerToken(BuilderUtils.newNodeId("127.0.0.1",
|
|
|
|
+ 1234), "password".getBytes(), new ContainerTokenIdentifier(
|
|
|
|
+ contId, containerManagerAddr, "user",
|
|
|
|
+ BuilderUtils.newResource(1024, 1),
|
|
|
|
+ System.currentTimeMillis() + 10000L, 123));
|
|
|
|
+ }
|
|
|
|
+
|
|
private static class ContainerManagerForTest implements ContainerManager {
|
|
private static class ContainerManagerForTest implements ContainerManager {
|
|
|
|
|
|
private CyclicBarrier startLaunchBarrier;
|
|
private CyclicBarrier startLaunchBarrier;
|