|
@@ -18,10 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
|
-import static org.mockito.Mockito.when;
|
|
|
|
-
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.net.InetAddress;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
|
|
+import java.net.UnknownHostException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
@@ -63,7 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
@@ -77,8 +77,11 @@ public class TestRMRestart {
|
|
|
|
|
|
private YarnConfiguration conf;
|
|
private YarnConfiguration conf;
|
|
|
|
|
|
|
|
+ // Fake rmAddr for token-renewal
|
|
|
|
+ private static InetSocketAddress rmAddr;
|
|
|
|
+
|
|
@Before
|
|
@Before
|
|
- public void setup() {
|
|
|
|
|
|
+ public void setup() throws UnknownHostException {
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
ExitUtil.disableSystemExit();
|
|
ExitUtil.disableSystemExit();
|
|
@@ -86,6 +89,8 @@ public class TestRMRestart {
|
|
UserGroupInformation.setConfiguration(conf);
|
|
UserGroupInformation.setConfiguration(conf);
|
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
|
+
|
|
|
|
+ rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123);
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout=180000)
|
|
@Test (timeout=180000)
|
|
@@ -446,6 +451,7 @@ public class TestRMRestart {
|
|
Token<RMDelegationTokenIdentifier> token1 =
|
|
Token<RMDelegationTokenIdentifier> token1 =
|
|
new Token<RMDelegationTokenIdentifier>(dtId1,
|
|
new Token<RMDelegationTokenIdentifier>(dtId1,
|
|
rm1.getRMDTSecretManager());
|
|
rm1.getRMDTSecretManager());
|
|
|
|
+ SecurityUtil.setTokenService(token1, rmAddr);
|
|
ts.addToken(userText1, token1);
|
|
ts.addToken(userText1, token1);
|
|
tokenSet.add(token1);
|
|
tokenSet.add(token1);
|
|
|
|
|
|
@@ -456,6 +462,7 @@ public class TestRMRestart {
|
|
Token<RMDelegationTokenIdentifier> token2 =
|
|
Token<RMDelegationTokenIdentifier> token2 =
|
|
new Token<RMDelegationTokenIdentifier>(dtId2,
|
|
new Token<RMDelegationTokenIdentifier>(dtId2,
|
|
rm1.getRMDTSecretManager());
|
|
rm1.getRMDTSecretManager());
|
|
|
|
+ SecurityUtil.setTokenService(token2, rmAddr);
|
|
ts.addToken(userText2, token2);
|
|
ts.addToken(userText2, token2);
|
|
tokenSet.add(token2);
|
|
tokenSet.add(token2);
|
|
|
|
|
|
@@ -575,6 +582,7 @@ public class TestRMRestart {
|
|
@Test
|
|
@Test
|
|
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
|
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
|
+
|
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
memStore.init(conf);
|
|
memStore.init(conf);
|
|
RMState rmState = memStore.getState();
|
|
RMState rmState = memStore.getState();
|
|
@@ -587,20 +595,21 @@ public class TestRMRestart {
|
|
rmState.getRMDTSecretManagerState().getMasterKeyState();
|
|
rmState.getRMDTSecretManagerState().getMasterKeyState();
|
|
|
|
|
|
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
|
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
|
|
|
+
|
|
rm1.start();
|
|
rm1.start();
|
|
|
|
|
|
// create an empty credential
|
|
// create an empty credential
|
|
Credentials ts = new Credentials();
|
|
Credentials ts = new Credentials();
|
|
|
|
|
|
// request a token and add into credential
|
|
// request a token and add into credential
|
|
- GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class);
|
|
|
|
- when(request1.getRenewer()).thenReturn("renewer1");
|
|
|
|
|
|
+ GetDelegationTokenRequest request1 =
|
|
|
|
+ GetDelegationTokenRequest.newInstance("renewer1");
|
|
GetDelegationTokenResponse response1 =
|
|
GetDelegationTokenResponse response1 =
|
|
rm1.getClientRMService().getDelegationToken(request1);
|
|
rm1.getClientRMService().getDelegationToken(request1);
|
|
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
|
|
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
|
|
response1.getRMDelegationToken();
|
|
response1.getRMDelegationToken();
|
|
Token<RMDelegationTokenIdentifier> token1 =
|
|
Token<RMDelegationTokenIdentifier> token1 =
|
|
- ConverterUtils.convertFromYarn(delegationToken1, null);
|
|
|
|
|
|
+ ConverterUtils.convertFromYarn(delegationToken1, rmAddr);
|
|
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
|
|
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
|
|
|
|
|
|
HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
|
|
HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
|
|
@@ -632,14 +641,14 @@ public class TestRMRestart {
|
|
rmState.getRMDTSecretManagerState().getDTSequenceNumber());
|
|
rmState.getRMDTSecretManagerState().getDTSequenceNumber());
|
|
|
|
|
|
// request one more token
|
|
// request one more token
|
|
- GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class);
|
|
|
|
- when(request2.getRenewer()).thenReturn("renewer2");
|
|
|
|
|
|
+ GetDelegationTokenRequest request2 =
|
|
|
|
+ GetDelegationTokenRequest.newInstance("renewer2");
|
|
GetDelegationTokenResponse response2 =
|
|
GetDelegationTokenResponse response2 =
|
|
rm1.getClientRMService().getDelegationToken(request2);
|
|
rm1.getClientRMService().getDelegationToken(request2);
|
|
org.apache.hadoop.yarn.api.records.Token delegationToken2 =
|
|
org.apache.hadoop.yarn.api.records.Token delegationToken2 =
|
|
response2.getRMDelegationToken();
|
|
response2.getRMDelegationToken();
|
|
Token<RMDelegationTokenIdentifier> token2 =
|
|
Token<RMDelegationTokenIdentifier> token2 =
|
|
- ConverterUtils.convertFromYarn(delegationToken2, null);
|
|
|
|
|
|
+ ConverterUtils.convertFromYarn(delegationToken2, rmAddr);
|
|
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
|
|
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
|
|
|
|
|
|
// cancel token2
|
|
// cancel token2
|
|
@@ -721,20 +730,10 @@ public class TestRMRestart {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
|
|
|
- return new DelegationTokenRenewer() {
|
|
|
|
- @Override
|
|
|
|
- protected void renewToken(final DelegationTokenToRenew dttr)
|
|
|
|
- throws IOException {
|
|
|
|
- // Do nothing
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
|
|
|
- throws IOException {
|
|
|
|
- // Do nothing
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
|
+ super.serviceInit(conf);
|
|
|
|
+ RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
|
|
|
+ this.getRMDTSecretManager(), rmAddr);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|