|
@@ -16,7 +16,7 @@
|
|
* limitations under the License.
|
|
* limitations under the License.
|
|
*/
|
|
*/
|
|
|
|
|
|
-package org.apache.hadoop.mapreduce.v2;
|
|
|
|
|
|
+package org.apache.hadoop.mapred;
|
|
|
|
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Mockito.doAnswer;
|
|
import static org.mockito.Mockito.doAnswer;
|
|
@@ -29,6 +29,8 @@ import java.io.File;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
|
|
+import java.nio.ByteBuffer;
|
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
import junit.framework.TestCase;
|
|
@@ -48,7 +50,11 @@ import org.apache.hadoop.mapreduce.JobPriority;
|
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
|
@@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.DelegationToken;
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
|
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
@@ -239,6 +246,46 @@ public class TestYARNRunner extends TestCase {
|
|
delegate.getQueueAclsForCurrentUser();
|
|
delegate.getQueueAclsForCurrentUser();
|
|
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
|
|
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testHistoryServerToken() throws Exception {
|
|
|
|
+ final String masterPrincipal = Master.getMasterPrincipal(conf);
|
|
|
|
+
|
|
|
|
+ final MRClientProtocol hsProxy = mock(MRClientProtocol.class);
|
|
|
|
+ when(hsProxy.getDelegationToken(any(GetDelegationTokenRequest.class))).thenAnswer(
|
|
|
|
+ new Answer<GetDelegationTokenResponse>() {
|
|
|
|
+ public GetDelegationTokenResponse answer(InvocationOnMock invocation) {
|
|
|
|
+ GetDelegationTokenRequest request =
|
|
|
|
+ (GetDelegationTokenRequest)invocation.getArguments()[0];
|
|
|
|
+ // check that the renewer matches the cluster's RM principal
|
|
|
|
+ assertEquals(request.getRenewer(), masterPrincipal);
|
|
|
|
+
|
|
|
|
+ DelegationToken token =
|
|
|
|
+ recordFactory.newRecordInstance(DelegationToken.class);
|
|
|
|
+ // none of these fields matter for the sake of the test
|
|
|
|
+ token.setKind("");
|
|
|
|
+ token.setService("");
|
|
|
|
+ token.setIdentifier(ByteBuffer.allocate(0));
|
|
|
|
+ token.setPassword(ByteBuffer.allocate(0));
|
|
|
|
+ GetDelegationTokenResponse tokenResponse =
|
|
|
|
+ recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
|
|
|
|
+ tokenResponse.setDelegationToken(token);
|
|
|
|
+ return tokenResponse;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ UserGroupInformation.createRemoteUser("someone").doAs(
|
|
|
|
+ new PrivilegedExceptionAction<Void>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Void run() throws Exception {
|
|
|
|
+ yarnRunner = new YARNRunner(conf, null, null);
|
|
|
|
+ yarnRunner.getDelegationTokenFromHS(hsProxy);
|
|
|
|
+ verify(hsProxy).
|
|
|
|
+ getDelegationToken(any(GetDelegationTokenRequest.class));
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testAMAdminCommandOpts() throws Exception {
|
|
public void testAMAdminCommandOpts() throws Exception {
|