|
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
+import org.apache.hadoop.security.token.TokenRenewer;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
@@ -54,6 +55,51 @@ import org.junit.Test;
|
|
|
public class TestDelegationTokenRenewal {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(TestDelegationTokenRenewal.class);
|
|
|
+ private static final Text KIND = new Text("TestDelegationTokenRenewal.Token");
|
|
|
+
|
|
|
+ public static class Renewer extends TokenRenewer {
|
|
|
+ private static int counter = 0;
|
|
|
+ private static Token<?> lastRenewed = null;
|
|
|
+ private static Token<?> tokenToRenewIn2Sec = null;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean handleKind(Text kind) {
|
|
|
+ return KIND.equals(kind);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isManaged(Token<?> token) throws IOException {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long renew(Token<?> t, Configuration conf) throws IOException {
|
|
|
+ MyToken token = (MyToken)t;
|
|
|
+ if(token.isCanceled()) {
|
|
|
+ throw new InvalidToken("token has been canceled");
|
|
|
+ }
|
|
|
+ lastRenewed = token;
|
|
|
+ counter ++;
|
|
|
+ LOG.info("Called MYDFS.renewdelegationtoken " + token +
|
|
|
+ ";this dfs=" + this.hashCode() + ";c=" + counter);
|
|
|
+ if(tokenToRenewIn2Sec == token) {
|
|
|
+ // this token first renewal in 2 seconds
|
|
|
+ LOG.info("RENEW in 2 seconds");
|
|
|
+ tokenToRenewIn2Sec=null;
|
|
|
+ return 2*1000 + System.currentTimeMillis();
|
|
|
+ } else {
|
|
|
+ return 86400*1000 + System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void cancel(Token<?> t, Configuration conf) {
|
|
|
+ MyToken token = (MyToken)t;
|
|
|
+ LOG.info("Cancel token " + token);
|
|
|
+ token.cancelToken();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
private static Configuration conf;
|
|
|
|
|
@@ -69,7 +115,7 @@ public class TestDelegationTokenRenewal {
|
|
|
|
|
|
conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class, DistributedFileSystem.class);
|
|
|
FileSystem.setDefaultUri(conf, uri);
|
|
|
- System.out.println("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
|
|
|
+ LOG.info("filesystem uri = " + FileSystem.getDefaultUri(conf).toString());
|
|
|
}
|
|
|
|
|
|
private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager {
|
|
@@ -100,11 +146,14 @@ public class TestDelegationTokenRenewal {
|
|
|
public MyToken(DelegationTokenIdentifier dtId1,
|
|
|
MyDelegationTokenSecretManager sm) {
|
|
|
super(dtId1, sm);
|
|
|
+ setKind(KIND);
|
|
|
status = "GOOD";
|
|
|
}
|
|
|
|
|
|
public boolean isCanceled() {return status.equals(CANCELED);}
|
|
|
+
|
|
|
public void cancelToken() {this.status=CANCELED;}
|
|
|
+
|
|
|
public String toString() {
|
|
|
StringBuilder sb = new StringBuilder(1024);
|
|
|
|
|
@@ -130,51 +179,18 @@ public class TestDelegationTokenRenewal {
|
|
|
* exception
|
|
|
*/
|
|
|
static class MyFS extends DistributedFileSystem {
|
|
|
- int counter=0;
|
|
|
- MyToken token;
|
|
|
- MyToken tokenToRenewIn2Sec;
|
|
|
|
|
|
public MyFS() {}
|
|
|
public void close() {}
|
|
|
@Override
|
|
|
public void initialize(URI uri, Configuration conf) throws IOException {}
|
|
|
|
|
|
- @Override
|
|
|
- public long renewDelegationToken(Token<DelegationTokenIdentifier> t)
|
|
|
- throws InvalidToken, IOException {
|
|
|
- MyToken token = (MyToken)t;
|
|
|
- if(token.isCanceled()) {
|
|
|
- throw new InvalidToken("token has been canceled");
|
|
|
- }
|
|
|
- counter ++;
|
|
|
- this.token = (MyToken)token;
|
|
|
- System.out.println("Called MYDFS.renewdelegationtoken " + token +
|
|
|
- ";this dfs=" + this.hashCode() + ";c=" + counter);
|
|
|
- if(tokenToRenewIn2Sec == token) {
|
|
|
- // this token first renewal in 2 seconds
|
|
|
- System.out.println("RENEW in 2 seconds");
|
|
|
- tokenToRenewIn2Sec=null;
|
|
|
- return 2*1000 + System.currentTimeMillis();
|
|
|
- } else {
|
|
|
- return 86400*1000 + System.currentTimeMillis();
|
|
|
- }
|
|
|
- }
|
|
|
@Override
|
|
|
- public MyToken getDelegationToken(Text renewer)
|
|
|
- throws IOException {
|
|
|
- System.out.println("Called MYDFS.getdelegationtoken");
|
|
|
- return createTokens(renewer);
|
|
|
+ public MyToken getDelegationToken(Text renewer) throws IOException {
|
|
|
+ MyToken result = createTokens(renewer);
|
|
|
+ LOG.info("Called MYDFS.getdelegationtoken " + result);
|
|
|
+ return result;
|
|
|
}
|
|
|
- @Override
|
|
|
- public void cancelDelegationToken(Token<DelegationTokenIdentifier> t)
|
|
|
- throws IOException {
|
|
|
- MyToken token = (MyToken)t;
|
|
|
- token.cancelToken();
|
|
|
- }
|
|
|
-
|
|
|
- public void setTokenToRenewIn2Sec(MyToken t) {tokenToRenewIn2Sec=t;}
|
|
|
- public int getCounter() {return counter; }
|
|
|
- public MyToken getToken() {return token;}
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -222,9 +238,9 @@ public class TestDelegationTokenRenewal {
|
|
|
* @throws URISyntaxException
|
|
|
*/
|
|
|
@Test
|
|
|
- public void testDTRenewal () throws IOException, URISyntaxException {
|
|
|
+ public void testDTRenewal () throws Exception {
|
|
|
MyFS dfs = (MyFS)FileSystem.get(conf);
|
|
|
- System.out.println("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
|
|
+ LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode());
|
|
|
// Test 1. - add three tokens - make sure exactly one get's renewed
|
|
|
|
|
|
// get the delegation tokens
|
|
@@ -234,8 +250,8 @@ public class TestDelegationTokenRenewal {
|
|
|
token3 = dfs.getDelegationToken(new Text("user3"));
|
|
|
|
|
|
//to cause this one to be set for renew in 2 secs
|
|
|
- dfs.setTokenToRenewIn2Sec(token1);
|
|
|
- System.out.println("token="+token1+" should be renewed for 2 secs");
|
|
|
+ Renewer.tokenToRenewIn2Sec = token1;
|
|
|
+ LOG.info("token="+token1+" should be renewed for 2 secs");
|
|
|
|
|
|
// three distinct Namenodes
|
|
|
String nn1 = DelegationTokenRenewal.SCHEME + "://host1:0";
|
|
@@ -262,15 +278,15 @@ public class TestDelegationTokenRenewal {
|
|
|
} catch (InterruptedException e) {}
|
|
|
|
|
|
// since we cannot guarantee timely execution - let's give few chances
|
|
|
- if(dfs.getCounter()==numberOfExpectedRenewals)
|
|
|
+ if(Renewer.counter==numberOfExpectedRenewals)
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- System.out.println("dfs=" + dfs.hashCode() +
|
|
|
- ";Counter = " + dfs.getCounter() + ";t="+ dfs.getToken());
|
|
|
+ LOG.info("dfs=" + dfs.hashCode() +
|
|
|
+ ";Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
|
|
|
assertEquals("renew wasn't called as many times as expected(4):",
|
|
|
- numberOfExpectedRenewals, dfs.getCounter());
|
|
|
- assertEquals("most recently renewed token mismatch", dfs.getToken(),
|
|
|
+ numberOfExpectedRenewals, Renewer.counter);
|
|
|
+ assertEquals("most recently renewed token mismatch", Renewer.lastRenewed,
|
|
|
token1);
|
|
|
|
|
|
// Test 2.
|
|
@@ -281,8 +297,8 @@ public class TestDelegationTokenRenewal {
|
|
|
MyToken token4 = dfs.getDelegationToken(new Text("user4"));
|
|
|
|
|
|
//to cause this one to be set for renew in 2 secs
|
|
|
- dfs.setTokenToRenewIn2Sec(token4);
|
|
|
- System.out.println("token="+token4+" should be renewed for 2 secs");
|
|
|
+ Renewer.tokenToRenewIn2Sec = token4;
|
|
|
+ LOG.info("token="+token4+" should be renewed for 2 secs");
|
|
|
|
|
|
String nn4 = DelegationTokenRenewal.SCHEME + "://host4:0";
|
|
|
ts.addToken(new Text(nn4), token4);
|
|
@@ -291,24 +307,22 @@ public class TestDelegationTokenRenewal {
|
|
|
JobID jid2 = new JobID("job2",1);
|
|
|
DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf);
|
|
|
DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2);
|
|
|
- numberOfExpectedRenewals = dfs.getCounter(); // number of renewals so far
|
|
|
+ numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
|
|
|
try {
|
|
|
Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew
|
|
|
} catch (InterruptedException e) {}
|
|
|
- System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken());
|
|
|
+ LOG.info("Counter = " + Renewer.counter + ";t="+ Renewer.lastRenewed);
|
|
|
|
|
|
// counter and the token should stil be the old ones
|
|
|
assertEquals("renew wasn't called as many times as expected",
|
|
|
- numberOfExpectedRenewals, dfs.getCounter());
|
|
|
+ numberOfExpectedRenewals, Renewer.counter);
|
|
|
|
|
|
// also renewing of the cancelled token should fail
|
|
|
- boolean exception=false;
|
|
|
try {
|
|
|
- dfs.renewDelegationToken(token4);
|
|
|
+ token4.renew(conf);
|
|
|
+ assertTrue("Renewal of canceled token didn't fail", false);
|
|
|
} catch (InvalidToken ite) {
|
|
|
//expected
|
|
|
- exception = true;
|
|
|
}
|
|
|
- assertTrue("Renew of canceled token didn't fail", exception);
|
|
|
}
|
|
|
}
|