|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app.rm;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyFloat;
|
|
|
import static org.mockito.Matchers.anyInt;
|
|
@@ -27,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.inOrder;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.never;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -99,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
@@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
@@ -121,6 +125,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
@@ -137,9 +142,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.security.TimelineV2DelegationTokenSecretManagerService.TimelineV2DelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
|
|
+import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -748,6 +755,96 @@ public class TestRMContainerAllocator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testUpdateCollectorInfo() throws Exception {
|
|
|
+ LOG.info("Running testUpdateCollectorInfo");
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
|
|
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appId, 0);
|
|
|
+ Job mockJob = mock(Job.class);
|
|
|
+ when(mockJob.getReport()).thenReturn(
|
|
|
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
|
|
+ String localAddr = "localhost:1234";
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+
|
|
|
+ // Generate a timeline delegation token.
|
|
|
+ TimelineDelegationTokenIdentifier ident =
|
|
|
+ new TimelineDelegationTokenIdentifier(new Text(ugi.getUserName()),
|
|
|
+ new Text("renewer"), null);
|
|
|
+ ident.setSequenceNumber(1);
|
|
|
+ Token<TimelineDelegationTokenIdentifier> collectorToken =
|
|
|
+ new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
|
|
|
+ new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
|
|
|
+ new Text(localAddr));
|
|
|
+ org.apache.hadoop.yarn.api.records.Token token =
|
|
|
+ org.apache.hadoop.yarn.api.records.Token.newInstance(
|
|
|
+ collectorToken.getIdentifier(), collectorToken.getKind().toString(),
|
|
|
+ collectorToken.getPassword(),
|
|
|
+ collectorToken.getService().toString());
|
|
|
+ CollectorInfo collectorInfo = CollectorInfo.newInstance(localAddr, token);
|
|
|
+ // Mock scheduler to server Allocate request.
|
|
|
+ final MockSchedulerForTimelineCollector mockScheduler =
|
|
|
+ new MockSchedulerForTimelineCollector(collectorInfo);
|
|
|
+ MyContainerAllocator allocator =
|
|
|
+ new MyContainerAllocator(null, conf, attemptId, mockJob,
|
|
|
+ SystemClock.getInstance()) {
|
|
|
+ @Override
|
|
|
+ protected void register() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ApplicationMasterProtocol createSchedulerProxy() {
|
|
|
+ return mockScheduler;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ // Initially UGI should have no tokens.
|
|
|
+ ArrayList<Token<? extends TokenIdentifier>> tokens =
|
|
|
+ new ArrayList<>(ugi.getTokens());
|
|
|
+ assertEquals(0, tokens.size());
|
|
|
+ TimelineV2Client client = spy(TimelineV2Client.createTimelineClient(appId));
|
|
|
+ client.init(conf);
|
|
|
+ when(((RunningAppContext)allocator.getContext()).getTimelineV2Client()).
|
|
|
+ thenReturn(client);
|
|
|
+
|
|
|
+ // Send allocate request to RM and fetch collector address and token.
|
|
|
+ allocator.schedule();
|
|
|
+ verify(client).setTimelineCollectorInfo(collectorInfo);
|
|
|
+ // Verify if token has been updated in UGI.
|
|
|
+ tokens = new ArrayList<>(ugi.getTokens());
|
|
|
+ assertEquals(1, tokens.size());
|
|
|
+ assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
|
|
|
+ tokens.get(0).getKind());
|
|
|
+ assertEquals(collectorToken.decodeIdentifier(),
|
|
|
+ tokens.get(0).decodeIdentifier());
|
|
|
+
|
|
|
+ // Generate new collector token, send allocate request to RM and fetch the
|
|
|
+ // new token.
|
|
|
+ ident.setSequenceNumber(100);
|
|
|
+ Token<TimelineDelegationTokenIdentifier> collectorToken1 =
|
|
|
+ new Token<TimelineDelegationTokenIdentifier> (ident.getBytes(),
|
|
|
+ new byte[0], TimelineDelegationTokenIdentifier.KIND_NAME,
|
|
|
+ new Text(localAddr));
|
|
|
+ token = org.apache.hadoop.yarn.api.records.Token.newInstance(
|
|
|
+ collectorToken1.getIdentifier(), collectorToken1.getKind().toString(),
|
|
|
+ collectorToken1.getPassword(), collectorToken1.getService().toString());
|
|
|
+ collectorInfo = CollectorInfo.newInstance(localAddr, token);
|
|
|
+ mockScheduler.updateCollectorInfo(collectorInfo);
|
|
|
+ allocator.schedule();
|
|
|
+ verify(client).setTimelineCollectorInfo(collectorInfo);
|
|
|
+ // Verify if new token has been updated in UGI.
|
|
|
+ tokens = new ArrayList<>(ugi.getTokens());
|
|
|
+ assertEquals(1, tokens.size());
|
|
|
+ assertEquals(TimelineDelegationTokenIdentifier.KIND_NAME,
|
|
|
+ tokens.get(0).getKind());
|
|
|
+ assertEquals(collectorToken1.decodeIdentifier(),
|
|
|
+ tokens.get(0).decodeIdentifier());
|
|
|
+ allocator.close();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testMapReduceScheduling() throws Exception {
|
|
|
|
|
@@ -3488,6 +3585,46 @@ public class TestRMContainerAllocator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class MockSchedulerForTimelineCollector
|
|
|
+ implements ApplicationMasterProtocol {
|
|
|
+ CollectorInfo collectorInfo;
|
|
|
+
|
|
|
+ public MockSchedulerForTimelineCollector(CollectorInfo info) {
|
|
|
+ this.collectorInfo = info;
|
|
|
+ }
|
|
|
+
|
|
|
+ void updateCollectorInfo(CollectorInfo info) {
|
|
|
+ collectorInfo = info;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RegisterApplicationMasterResponse registerApplicationMaster(
|
|
|
+ RegisterApplicationMasterRequest request) throws YarnException,
|
|
|
+ IOException {
|
|
|
+ return Records.newRecord(RegisterApplicationMasterResponse.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FinishApplicationMasterResponse finishApplicationMaster(
|
|
|
+ FinishApplicationMasterRequest request) throws YarnException,
|
|
|
+ IOException {
|
|
|
+ return FinishApplicationMasterResponse.newInstance(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public AllocateResponse allocate(AllocateRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ AllocateResponse response = AllocateResponse.newInstance(
|
|
|
+ request.getResponseId(), Collections.<ContainerStatus>emptyList(),
|
|
|
+ Collections.<Container>emptyList(),
|
|
|
+ Collections.<NodeReport>emptyList(),
|
|
|
+ Resource.newInstance(512000, 1024), null, 10, null,
|
|
|
+ Collections.<NMToken>emptyList());
|
|
|
+ response.setCollectorInfo(collectorInfo);
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
|
|
t.testSimple();
|