|
@@ -18,33 +18,19 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
|
|
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager
|
|
|
- .ParameterizedSchedulerTestBase;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
-import org.junit.Before;
|
|
|
-import static org.mockito.Matchers.any;
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
-import static org.mockito.Mockito.when;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.lang.annotation.Annotation;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
-import java.security.PrivilegedAction;
|
|
|
-import java.security.PrivilegedExceptionAction;
|
|
|
-import java.util.Timer;
|
|
|
-import java.util.TimerTask;
|
|
|
-
|
|
|
-import javax.security.sasl.SaslException;
|
|
|
-
|
|
|
-import org.junit.Assert;
|
|
|
-
|
|
|
+import com.google.protobuf.BlockingService;
|
|
|
+import com.google.protobuf.RpcController;
|
|
|
+import com.google.protobuf.ServiceException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
+import org.apache.hadoop.ipc.ProtocolInfo;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
+import org.apache.hadoop.ipc.TestRpcBase;
|
|
|
+import org.apache.hadoop.ipc.protobuf.TestProtos;
|
|
|
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.KerberosInfo;
|
|
|
import org.apache.hadoop.security.SecurityInfo;
|
|
@@ -63,9 +49,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
-import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
|
@@ -75,12 +61,30 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRMWithCustomAMLauncher;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import javax.security.sasl.SaslException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.lang.annotation.Annotation;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.security.PrivilegedAction;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.Timer;
|
|
|
+import java.util.TimerTask;
|
|
|
+
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
private YarnConfiguration conf;
|
|
|
|
|
@@ -89,11 +93,12 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
conf = getConf();
|
|
|
}
|
|
|
|
|
|
- private interface CustomProtocol {
|
|
|
- @SuppressWarnings("unused")
|
|
|
- public static final long versionID = 1L;
|
|
|
-
|
|
|
- public void ping() throws YarnException, IOException;
|
|
|
+ @TokenInfo(ClientToAMTokenSelector.class)
|
|
|
+ @ProtocolInfo(protocolName =
|
|
|
+ "org.apache.hadoop.yarn.server.resourcemanager.security$CustomProtocol",
|
|
|
+ protocolVersion = 1)
|
|
|
+ public interface CustomProtocol
|
|
|
+ extends TestRpcServiceProtos.CustomProto.BlockingInterface {
|
|
|
}
|
|
|
|
|
|
private static class CustomSecurityInfo extends SecurityInfo {
|
|
@@ -137,8 +142,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void ping() throws YarnException, IOException {
|
|
|
+ public TestProtos.EmptyResponseProto ping(RpcController unused,
|
|
|
+ TestProtos.EmptyRequestProto request) throws ServiceException {
|
|
|
this.pinged = true;
|
|
|
+ return TestProtos.EmptyResponseProto.newBuilder().build();
|
|
|
}
|
|
|
|
|
|
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
|
|
@@ -148,6 +155,13 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
Configuration conf = getConfig();
|
|
|
+ // Set RPC engine to protobuf RPC engine
|
|
|
+ RPC.setProtocolEngine(conf, CustomProtocol.class,
|
|
|
+ ProtobufRpcEngine.class);
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
+
|
|
|
+ BlockingService service = TestRpcServiceProtos.CustomProto
|
|
|
+ .newReflectiveBlockingService(this);
|
|
|
|
|
|
Server server;
|
|
|
try {
|
|
@@ -158,7 +172,7 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
.setProtocol(CustomProtocol.class)
|
|
|
.setNumHandlers(1)
|
|
|
.setSecretManager(secretMgr)
|
|
|
- .setInstance(this).build();
|
|
|
+ .setInstance(service).build();
|
|
|
} catch (Exception e) {
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
@@ -176,6 +190,8 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
public void testClientToAMTokens() throws Exception {
|
|
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
"kerberos");
|
|
|
+ // Set RPC engine to protobuf RPC engine
|
|
|
+ RPC.setProtocolEngine(conf, CustomProtocol.class, ProtobufRpcEngine.class);
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
|
|
|
ContainerManagementProtocol containerManager =
|
|
@@ -268,10 +284,9 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// Verify denial for unauthenticated user
|
|
|
try {
|
|
|
- CustomProtocol client =
|
|
|
- (CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L, am.address,
|
|
|
- conf);
|
|
|
- client.ping();
|
|
|
+ CustomProtocol client = RPC.getProxy(CustomProtocol.class,
|
|
|
+ 1L, am.address, conf);
|
|
|
+ client.ping(null, TestRpcBase.newEmptyRequest());
|
|
|
fail("Access by unauthenticated user should fail!!");
|
|
|
} catch (Exception e) {
|
|
|
Assert.assertFalse(am.pinged);
|
|
@@ -335,17 +350,16 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
try {
|
|
|
- CustomProtocol client =
|
|
|
- (CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L,
|
|
|
+ CustomProtocol client = RPC.getProxy(CustomProtocol.class, 1L,
|
|
|
am.address, conf);
|
|
|
- client.ping();
|
|
|
+ client.ping(null, TestRpcBase.newEmptyRequest());
|
|
|
fail("Connection initiation with illegally modified "
|
|
|
+ "tokens is expected to fail.");
|
|
|
return null;
|
|
|
- } catch (YarnException ex) {
|
|
|
- fail("Cannot get a YARN remote exception as "
|
|
|
- + "it will indicate RPC success");
|
|
|
- throw ex;
|
|
|
+ } catch (ServiceException ex) {
|
|
|
+ //fail("Cannot get a YARN remote exception as "
|
|
|
+ // + "it indicates RPC success");
|
|
|
+ throw (Exception) ex.getCause();
|
|
|
}
|
|
|
}
|
|
|
});
|
|
@@ -383,9 +397,8 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
CustomProtocol client =
|
|
|
- (CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L, am.address,
|
|
|
- conf);
|
|
|
- client.ping();
|
|
|
+ RPC.getProxy(CustomProtocol.class, 1L, am.address, conf);
|
|
|
+ client.ping(null, TestRpcBase.newEmptyRequest());
|
|
|
Assert.assertTrue(am.pinged);
|
|
|
return null;
|
|
|
}
|
|
@@ -402,10 +415,9 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
|
|
|
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
- CustomProtocol client =
|
|
|
- (CustomProtocol) RPC.getProxy(CustomProtocol.class, 1L, am.address,
|
|
|
- conf);
|
|
|
- client.ping();
|
|
|
+ CustomProtocol client = RPC.getProxy(CustomProtocol.class,
|
|
|
+ 1L, am.address, conf);
|
|
|
+ client.ping(null, TestRpcBase.newEmptyRequest());
|
|
|
Assert.assertTrue(am.pinged);
|
|
|
return null;
|
|
|
}
|