|
@@ -35,26 +35,28 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
-import org.apache.hadoop.mapred.MiniMRCluster;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
+import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.SleepJob;
|
|
|
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
|
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.BeforeClass;
|
|
|
-import org.junit.Ignore;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-@SuppressWarnings("deprecation")
|
|
|
-@Ignore
|
|
|
public class TestBinaryTokenFile {
|
|
|
|
|
|
+ private static final String KEY_SECURITY_TOKEN_FILE_NAME = "key-security-token-file";
|
|
|
+ private static final String DELEGATION_TOKEN_KEY = "Hdfs";
|
|
|
+
|
|
|
// my sleep class
|
|
|
static class MySleepMapper extends SleepJob.SleepMapper {
|
|
|
/**
|
|
@@ -63,29 +65,65 @@ public class TestBinaryTokenFile {
|
|
|
@Override
|
|
|
public void map(IntWritable key, IntWritable value, Context context)
|
|
|
throws IOException, InterruptedException {
|
|
|
- // get token storage and a key
|
|
|
- Credentials ts = context.getCredentials();
|
|
|
- Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens();
|
|
|
+ // get context token storage:
|
|
|
+ final Credentials contextCredentials = context.getCredentials();
|
|
|
|
|
|
+ final Collection<Token<? extends TokenIdentifier>> contextTokenCollection = contextCredentials.getAllTokens();
|
|
|
+ for (Token<? extends TokenIdentifier> t : contextTokenCollection) {
|
|
|
+ System.out.println("Context token: [" + t + "]");
|
|
|
+ }
|
|
|
+ if (contextTokenCollection.size() != 2) { // one job token and one delegation token
|
|
|
+ // fail the test:
|
|
|
+ throw new RuntimeException("Exactly 2 tokens are expected in the contextTokenCollection: " +
|
|
|
+ "one job token and one delegation token, but was found " + contextTokenCollection.size() + " tokens.");
|
|
|
+ }
|
|
|
|
|
|
- if(dts.size() != 2) { // one job token and one delegation token
|
|
|
- throw new RuntimeException("tokens are not available"); // fail the test
|
|
|
+ final Token<? extends TokenIdentifier> dt = contextCredentials.getToken(new Text(DELEGATION_TOKEN_KEY));
|
|
|
+ if (dt == null) {
|
|
|
+ throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found in the job context.");
|
|
|
}
|
|
|
|
|
|
- Token<? extends TokenIdentifier> dt = ts.getToken(new Text("Hdfs"));
|
|
|
+ String tokenFile0 = context.getConfiguration().get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
|
|
|
+ if (tokenFile0 != null) {
|
|
|
+ throw new RuntimeException("Token file key ["+MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY+"] found in the configuration. It should have been removed from the configuration.");
|
|
|
+ }
|
|
|
|
|
|
- //Verify that dt is same as the token in the file
|
|
|
- String tokenFile = context.getConfiguration().get(
|
|
|
- "mapreduce.job.credentials.binary");
|
|
|
- Credentials cred = new Credentials();
|
|
|
- cred.readTokenStorageStream(new DataInputStream(new FileInputStream(
|
|
|
+ final String tokenFile = context.getConfiguration().get(KEY_SECURITY_TOKEN_FILE_NAME);
|
|
|
+ if (tokenFile == null) {
|
|
|
+ throw new RuntimeException("Token file key ["+KEY_SECURITY_TOKEN_FILE_NAME+"] not found in the job configuration.");
|
|
|
+ }
|
|
|
+ final Credentials binaryCredentials = new Credentials();
|
|
|
+ binaryCredentials.readTokenStorageStream(new DataInputStream(new FileInputStream(
|
|
|
tokenFile)));
|
|
|
- for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) {
|
|
|
- if (!dt.equals(t)) {
|
|
|
- throw new RuntimeException(
|
|
|
- "Delegation token in job is not same as the token passed in file."
|
|
|
- + " tokenInFile=" + t + ", dt=" + dt);
|
|
|
- }
|
|
|
+ final Collection<Token<? extends TokenIdentifier>> binaryTokenCollection = binaryCredentials.getAllTokens();
|
|
|
+ if (binaryTokenCollection.size() != 1) {
|
|
|
+ throw new RuntimeException("The token collection read from file ["+tokenFile+"] must have size = 1.");
|
|
|
+ }
|
|
|
+ final Token<? extends TokenIdentifier> binTok = binaryTokenCollection.iterator().next();
|
|
|
+ System.out.println("The token read from binary file: t = [" + binTok + "]");
|
|
|
+ // Verify that dt is same as the token in the file:
|
|
|
+ if (!dt.equals(binTok)) {
|
|
|
+ throw new RuntimeException(
|
|
|
+ "Delegation token in job is not same as the token passed in file:"
|
|
|
+ + " tokenInFile=[" + binTok + "], dt=[" + dt + "].");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now test the user tokens.
|
|
|
+ final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ // Print all the UGI tokens for diagnostic purposes:
|
|
|
+ final Collection<Token<? extends TokenIdentifier>> ugiTokenCollection = ugi.getTokens();
|
|
|
+ for (Token<? extends TokenIdentifier> t: ugiTokenCollection) {
|
|
|
+ System.out.println("UGI token: [" + t + "]");
|
|
|
+ }
|
|
|
+ final Token<? extends TokenIdentifier> ugiToken
|
|
|
+ = ugi.getCredentials().getToken(new Text(DELEGATION_TOKEN_KEY));
|
|
|
+ if (ugiToken == null) {
|
|
|
+ throw new RuntimeException("Token for key ["+DELEGATION_TOKEN_KEY+"] not found among the UGI tokens.");
|
|
|
+ }
|
|
|
+ if (!ugiToken.equals(binTok)) {
|
|
|
+ throw new RuntimeException(
|
|
|
+ "UGI token is not same as the token passed in binary file:"
|
|
|
+ + " tokenInBinFile=[" + binTok + "], ugiTok=[" + ugiToken + "].");
|
|
|
}
|
|
|
|
|
|
super.map(key, value, context);
|
|
@@ -118,13 +156,20 @@ public class TestBinaryTokenFile {
|
|
|
TokenCache.obtainTokensForNamenodesInternal(cred1, new Path[] { p1 },
|
|
|
job.getConfiguration());
|
|
|
for (Token<? extends TokenIdentifier> t : cred1.getAllTokens()) {
|
|
|
- cred2.addToken(new Text("Hdfs"), t);
|
|
|
+ cred2.addToken(new Text(DELEGATION_TOKEN_KEY), t);
|
|
|
}
|
|
|
DataOutputStream os = new DataOutputStream(new FileOutputStream(
|
|
|
binaryTokenFileName.toString()));
|
|
|
- cred2.writeTokenStorageToStream(os);
|
|
|
- os.close();
|
|
|
- job.getConfiguration().set("mapreduce.job.credentials.binary",
|
|
|
+ try {
|
|
|
+ cred2.writeTokenStorageToStream(os);
|
|
|
+ } finally {
|
|
|
+ os.close();
|
|
|
+ }
|
|
|
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
|
|
|
+ binaryTokenFileName.toString());
|
|
|
+ // NB: the MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY key now gets deleted from config,
|
|
|
+ // so it's not accessible in the job's config. So, we use another key to pass the file name into the job configuration:
|
|
|
+ job.getConfiguration().set(KEY_SECURITY_TOKEN_FILE_NAME,
|
|
|
binaryTokenFileName.toString());
|
|
|
} catch (IOException e) {
|
|
|
Assert.fail("Exception " + e);
|
|
@@ -132,39 +177,53 @@ public class TestBinaryTokenFile {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static MiniMRCluster mrCluster;
|
|
|
+ private static MiniMRYarnCluster mrCluster;
|
|
|
private static MiniDFSCluster dfsCluster;
|
|
|
+
|
|
|
private static final Path TEST_DIR =
|
|
|
new Path(System.getProperty("test.build.data","/tmp"));
|
|
|
private static final Path binaryTokenFileName = new Path(TEST_DIR, "tokenFile.binary");
|
|
|
- private static int numSlaves = 1;
|
|
|
- private static JobConf jConf;
|
|
|
+
|
|
|
+ private static final int numSlaves = 1; // num of data nodes
|
|
|
+ private static final int noOfNMs = 1;
|
|
|
+
|
|
|
private static Path p1;
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void setUp() throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
|
|
|
- jConf = new JobConf(conf);
|
|
|
- mrCluster = new MiniMRCluster(0, 0, numSlaves,
|
|
|
- dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
|
|
|
- jConf);
|
|
|
+ final Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
|
|
+ conf.set(YarnConfiguration.RM_PRINCIPAL, "jt_id/" + SecurityUtil.HOSTNAME_PATTERN + "@APACHE.ORG");
|
|
|
+
|
|
|
+ final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
|
|
|
+ builder.checkExitOnShutdown(true);
|
|
|
+ builder.numDataNodes(numSlaves);
|
|
|
+ builder.format(true);
|
|
|
+ builder.racks(null);
|
|
|
+ dfsCluster = builder.build();
|
|
|
+
|
|
|
+ mrCluster = new MiniMRYarnCluster(TestBinaryTokenFile.class.getName(), noOfNMs);
|
|
|
+ mrCluster.init(conf);
|
|
|
+ mrCluster.start();
|
|
|
|
|
|
- NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
|
|
|
- FileSystem fs = dfsCluster.getFileSystem();
|
|
|
+ NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads();
|
|
|
|
|
|
+ FileSystem fs = dfsCluster.getFileSystem();
|
|
|
p1 = new Path("file1");
|
|
|
p1 = fs.makeQualified(p1);
|
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
|
public static void tearDown() throws Exception {
|
|
|
- if(mrCluster != null)
|
|
|
- mrCluster.shutdown();
|
|
|
- mrCluster = null;
|
|
|
- if(dfsCluster != null)
|
|
|
+ if(mrCluster != null) {
|
|
|
+ mrCluster.stop();
|
|
|
+ mrCluster = null;
|
|
|
+ }
|
|
|
+ if(dfsCluster != null) {
|
|
|
dfsCluster.shutdown();
|
|
|
- dfsCluster = null;
|
|
|
+ dfsCluster = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -173,31 +232,24 @@ public class TestBinaryTokenFile {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testBinaryTokenFile() throws IOException {
|
|
|
-
|
|
|
- System.out.println("running dist job");
|
|
|
-
|
|
|
- // make sure JT starts
|
|
|
- jConf = mrCluster.createJobConf();
|
|
|
+ Configuration conf = mrCluster.getConfig();
|
|
|
|
|
|
// provide namenodes names for the job to get the delegation tokens for
|
|
|
- String nnUri = dfsCluster.getURI(0).toString();
|
|
|
- jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
|
|
|
- // job tracker principla id..
|
|
|
- jConf.set(JTConfig.JT_USER_NAME, "jt_id");
|
|
|
+ final String nnUri = dfsCluster.getURI(0).toString();
|
|
|
+ conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
|
|
|
|
|
|
// using argument to pass the file name
|
|
|
- String[] args = {
|
|
|
+ final String[] args = {
|
|
|
"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
|
|
|
};
|
|
|
-
|
|
|
int res = -1;
|
|
|
try {
|
|
|
- res = ToolRunner.run(jConf, new MySleepJob(), args);
|
|
|
+ res = ToolRunner.run(conf, new MySleepJob(), args);
|
|
|
} catch (Exception e) {
|
|
|
- System.out.println("Job failed with" + e.getLocalizedMessage());
|
|
|
+ System.out.println("Job failed with " + e.getLocalizedMessage());
|
|
|
e.printStackTrace(System.out);
|
|
|
fail("Job failed");
|
|
|
}
|
|
|
- assertEquals("dist job res is not 0", res, 0);
|
|
|
+ assertEquals("dist job res is not 0:", 0, res);
|
|
|
}
|
|
|
}
|