|
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
|
|
|
|
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Mockito.doAnswer;
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.spy;
|
|
import static org.mockito.Mockito.spy;
|
|
|
|
+import static org.mockito.Mockito.times;
|
|
import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
@@ -30,6 +32,7 @@ import java.io.File;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -39,28 +42,24 @@ import junit.framework.TestCase;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
-import org.apache.hadoop.mapred.ClientCache;
|
|
|
|
-import org.apache.hadoop.mapred.ClientServiceDelegate;
|
|
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
|
-import org.apache.hadoop.mapred.Master;
|
|
|
|
-import org.apache.hadoop.mapred.ResourceMgrDelegate;
|
|
|
|
-import org.apache.hadoop.mapred.YARNRunner;
|
|
|
|
|
|
+import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
import org.apache.hadoop.mapreduce.JobPriority;
|
|
import org.apache.hadoop.mapreduce.JobPriority;
|
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
|
import org.apache.hadoop.mapreduce.JobStatus.State;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
@@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
|
import org.apache.hadoop.yarn.api.records.DelegationToken;
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
|
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
|
+import org.apache.hadoop.yarn.util.Records;
|
|
import org.apache.log4j.Appender;
|
|
import org.apache.log4j.Appender;
|
|
import org.apache.log4j.Layout;
|
|
import org.apache.log4j.Layout;
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.log4j.Logger;
|
|
@@ -146,7 +151,7 @@ public class TestYARNRunner extends TestCase {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=20000)
|
|
public void testJobKill() throws Exception {
|
|
public void testJobKill() throws Exception {
|
|
clientDelegate = mock(ClientServiceDelegate.class);
|
|
clientDelegate = mock(ClientServiceDelegate.class);
|
|
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
|
|
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
|
|
@@ -171,7 +176,7 @@ public class TestYARNRunner extends TestCase {
|
|
verify(clientDelegate).killJob(jobId);
|
|
verify(clientDelegate).killJob(jobId);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=20000)
|
|
public void testJobSubmissionFailure() throws Exception {
|
|
public void testJobSubmissionFailure() throws Exception {
|
|
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
|
|
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
|
|
thenReturn(appId);
|
|
thenReturn(appId);
|
|
@@ -193,7 +198,7 @@ public class TestYARNRunner extends TestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=20000)
|
|
public void testResourceMgrDelegate() throws Exception {
|
|
public void testResourceMgrDelegate() throws Exception {
|
|
/* we not want a mock of resource mgr delegate */
|
|
/* we not want a mock of resource mgr delegate */
|
|
final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
|
|
final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
|
|
@@ -259,8 +264,88 @@ public class TestYARNRunner extends TestCase {
|
|
delegate.getQueueAclsForCurrentUser();
|
|
delegate.getQueueAclsForCurrentUser();
|
|
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
|
|
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
|
|
}
|
|
}
|
|
-
|
|
|
|
- @Test
|
|
|
|
|
|
+
|
|
|
|
+ @Test(timeout=20000)
|
|
|
|
+ public void testGetHSDelegationToken() throws Exception {
|
|
|
|
+ try {
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+
|
|
|
|
+ // Setup mock service
|
|
|
|
+ InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
|
|
|
|
+ Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
|
|
|
|
+
|
|
|
|
+ InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
|
|
|
|
+ Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
|
|
|
|
+
|
|
|
|
+ // Setup mock rm token
|
|
|
|
+ RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
|
|
|
|
+ new Text("owner"), new Text("renewer"), new Text("real"));
|
|
|
|
+ Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
|
|
|
|
+ new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
|
|
|
|
+ token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
|
|
|
|
+
|
|
|
|
+ // Setup mock history token
|
|
|
|
+ DelegationToken historyToken = BuilderUtils.newDelegationToken(
|
|
|
|
+ new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(),
|
|
|
|
+ new byte[0], hsTokenSevice.toString());
|
|
|
|
+ GetDelegationTokenResponse getDtResponse = Records
|
|
|
|
+ .newRecord(GetDelegationTokenResponse.class);
|
|
|
|
+ getDtResponse.setDelegationToken(historyToken);
|
|
|
|
+
|
|
|
|
+ // mock services
|
|
|
|
+ MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
|
|
|
|
+ doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
|
|
|
|
+ doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
|
|
|
|
+ any(GetDelegationTokenRequest.class));
|
|
|
|
+
|
|
|
|
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
|
+ doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
|
|
|
|
+
|
|
|
|
+ ClientCache clientCache = mock(ClientCache.class);
|
|
|
|
+ doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
|
|
|
|
+
|
|
|
|
+ Credentials creds = new Credentials();
|
|
|
|
+
|
|
|
|
+ YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
|
|
|
|
+
|
|
|
|
+ // No HS token if no RM token
|
|
|
|
+ yarnRunner.addHistoyToken(creds);
|
|
|
|
+ verify(mockHsProxy, times(0)).getDelegationToken(
|
|
|
|
+ any(GetDelegationTokenRequest.class));
|
|
|
|
+
|
|
|
|
+ // No HS token if RM token, but secirity disabled.
|
|
|
|
+ creds.addToken(new Text("rmdt"), token);
|
|
|
|
+ yarnRunner.addHistoyToken(creds);
|
|
|
|
+ verify(mockHsProxy, times(0)).getDelegationToken(
|
|
|
|
+ any(GetDelegationTokenRequest.class));
|
|
|
|
+
|
|
|
|
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
|
+ "kerberos");
|
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
|
+ creds = new Credentials();
|
|
|
|
+
|
|
|
|
+ // No HS token if no RM token, security enabled
|
|
|
|
+ yarnRunner.addHistoyToken(creds);
|
|
|
|
+ verify(mockHsProxy, times(0)).getDelegationToken(
|
|
|
|
+ any(GetDelegationTokenRequest.class));
|
|
|
|
+
|
|
|
|
+ // HS token if RM token present, security enabled
|
|
|
|
+ creds.addToken(new Text("rmdt"), token);
|
|
|
|
+ yarnRunner.addHistoyToken(creds);
|
|
|
|
+ verify(mockHsProxy, times(1)).getDelegationToken(
|
|
|
|
+ any(GetDelegationTokenRequest.class));
|
|
|
|
+
|
|
|
|
+ // No additional call to get HS token if RM and HS token present
|
|
|
|
+ yarnRunner.addHistoyToken(creds);
|
|
|
|
+ verify(mockHsProxy, times(1)).getDelegationToken(
|
|
|
|
+ any(GetDelegationTokenRequest.class));
|
|
|
|
+ } finally {
|
|
|
|
+ // Back to defaults.
|
|
|
|
+ UserGroupInformation.setConfiguration(new Configuration());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(timeout=20000)
|
|
public void testHistoryServerToken() throws Exception {
|
|
public void testHistoryServerToken() throws Exception {
|
|
//Set the master principal in the config
|
|
//Set the master principal in the config
|
|
conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
|
|
conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
|
|
@@ -303,7 +388,7 @@ public class TestYARNRunner extends TestCase {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=20000)
|
|
public void testAMAdminCommandOpts() throws Exception {
|
|
public void testAMAdminCommandOpts() throws Exception {
|
|
JobConf jobConf = new JobConf();
|
|
JobConf jobConf = new JobConf();
|
|
|
|
|
|
@@ -366,7 +451,7 @@ public class TestYARNRunner extends TestCase {
|
|
assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
|
|
assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=20000)
|
|
public void testWarnCommandOpts() throws Exception {
|
|
public void testWarnCommandOpts() throws Exception {
|
|
Logger logger = Logger.getLogger(YARNRunner.class);
|
|
Logger logger = Logger.getLogger(YARNRunner.class);
|
|
|
|
|