|
@@ -15,7 +15,7 @@
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
|
|
|
-package org.apache.hadoop.security;
|
|
|
+package org.apache.hadoop.mapreduce.security;
|
|
|
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
@@ -23,7 +23,9 @@ import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -32,8 +34,12 @@ import javax.crypto.spec.SecretKeySpec;
|
|
|
|
|
|
import org.apache.commons.codec.binary.Base64;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.examples.SleepJob;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.NullWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -41,16 +47,20 @@ import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.MiniMRCluster;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
-import org.apache.hadoop.mapreduce.Job;
|
|
|
-//import org.apache.hadoop.mapreduce.SleepJob;
|
|
|
-import org.apache.hadoop.examples.SleepJob;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
+import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
+import org.apache.hadoop.mapreduce.security.TokenStorage;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
|
-import org.apache.hadoop.mapreduce.security.TokenStorage;
|
|
|
-import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
+import static org.junit.Assert.*;
|
|
|
+
|
|
|
|
|
|
public class TestTokenCache {
|
|
|
private static final int NUM_OF_KEYS = 10;
|
|
@@ -67,15 +77,29 @@ public class TestTokenCache {
|
|
|
// get token storage and a key
|
|
|
TokenStorage ts = TokenCache.getTokenStorage();
|
|
|
byte[] key1 = TokenCache.getSecretKey(new Text("alias1"));
|
|
|
-
|
|
|
+ Collection<Token<? extends TokenIdentifier>> dts = TokenCache.getAllTokens();
|
|
|
+ int dts_size = 0;
|
|
|
+ if(dts != null)
|
|
|
+ dts_size = dts.size();
|
|
|
+
|
|
|
System.out.println("inside MAP: ts==NULL?=" + (ts==null) +
|
|
|
"; #keys = " + (ts==null? 0:ts.numberOfSecretKeys()) +
|
|
|
";jobToken = " + (ts==null? "n/a":ts.getJobToken()) +
|
|
|
- "; alias1 key=" + new String(key1));
|
|
|
+ "; alias1 key=" + new String(key1) +
|
|
|
+ "; dts size= " + dts_size);
|
|
|
|
|
|
- if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
|
|
|
+ for(Token<? extends TokenIdentifier> t : dts) {
|
|
|
+ System.out.println(t.getKind() + "=" + StringUtils.byteToHexString(t.getPassword()));
|
|
|
+ }
|
|
|
+
|
|
|
+ if(dts.size() != 2) { // one job token and one delegation token
|
|
|
throw new RuntimeException("tokens are not available"); // fail the test
|
|
|
- }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) {
|
|
|
+ throw new RuntimeException("secret keys are not available"); // fail the test
|
|
|
+ }
|
|
|
+
|
|
|
super.map(key, value, output, reporter);
|
|
|
}
|
|
|
|
|
@@ -174,6 +198,14 @@ public class TestTokenCache {
|
|
|
// make sure JT starts
|
|
|
jConf = mrCluster.createJobConf();
|
|
|
|
|
|
+ // provide namenodes names for the job to get the delegation tokens for
|
|
|
+ //String nnUri = dfsCluster.getNameNode().getUri(namenode).toString();
|
|
|
+ NameNode nn = dfsCluster.getNameNode();
|
|
|
+ URI nnUri = NameNode.getUri(nn.getNameNodeAddress());
|
|
|
+ jConf.set(JobContext.JOB_NAMENODES, nnUri + "," + nnUri.toString());
|
|
|
+ // job tracker principle id..
|
|
|
+ jConf.set(JobContext.JOB_JOBTRACKER_ID, "jt_id");
|
|
|
+
|
|
|
// using argument to pass the file name
|
|
|
String[] args = {
|
|
|
"-tokenCacheFile", tokenFileName.toString(),
|
|
@@ -214,4 +246,39 @@ public class TestTokenCache {
|
|
|
}
|
|
|
assertEquals("local job res is not 0", res, 0);
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetTokensForNamenodes() throws IOException {
|
|
|
+ FileSystem fs = dfsCluster.getFileSystem();
|
|
|
+
|
|
|
+ Path p1 = new Path("file1");
|
|
|
+ Path p2 = new Path("file2");
|
|
|
+
|
|
|
+ p1 = fs.makeQualified(p1);
|
|
|
+ // do not qualify p2
|
|
|
+
|
|
|
+ TokenCache.obtainTokensForNamenodes(new Path [] {p1, p2}, jConf);
|
|
|
+
|
|
|
+ // this token is keyed by hostname:port key.
|
|
|
+ String fs_addr = TokenCache.buildDTServiceName(p1.toUri());
|
|
|
+ Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken(fs_addr);
|
|
|
+ System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt);
|
|
|
+
|
|
|
+ assertNotNull("Token for nn is null", nnt);
|
|
|
+
|
|
|
+ // verify the size
|
|
|
+ Collection<Token<? extends TokenIdentifier>> tns = TokenCache.getAllTokens();
|
|
|
+ assertEquals("number of tokens is not 1", 1, tns.size());
|
|
|
+
|
|
|
+ boolean found = false;
|
|
|
+ for(Token<? extends TokenIdentifier> t: tns) {
|
|
|
+ System.out.println("kind="+t.getKind() + ";servic=" + t.getService() + ";str=" + t.toString());
|
|
|
+
|
|
|
+ if(t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN")) &&
|
|
|
+ t.getService().equals(new Text(fs_addr))) {
|
|
|
+ found = true;
|
|
|
+ }
|
|
|
+ assertTrue("didn't find token for " + p1 ,found);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|