|
@@ -18,8 +18,14 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
|
|
+import javax.security.sasl.Sasl;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.avro.ipc.AvroRemoteException;
|
|
@@ -27,7 +33,16 @@ import org.apache.avro.util.Utf8;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo;
|
|
|
+import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
|
|
|
+import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.security.SaslRpcServer;
|
|
|
+import org.apache.hadoop.security.SecurityInfo;
|
|
|
+import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
|
|
|
/** Unit tests for AvroRpc. */
|
|
|
public class TestAvroRpc extends TestCase {
|
|
@@ -36,8 +51,6 @@ public class TestAvroRpc extends TestCase {
|
|
|
public static final Log LOG =
|
|
|
LogFactory.getLog(TestAvroRpc.class);
|
|
|
|
|
|
- private static Configuration conf = new Configuration();
|
|
|
-
|
|
|
int datasize = 1024*100;
|
|
|
int numThreads = 50;
|
|
|
|
|
@@ -56,19 +69,47 @@ public class TestAvroRpc extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testCalls() throws Exception {
|
|
|
+ public void testReflect() throws Exception {
|
|
|
+ testReflect(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testSecureReflect() throws Exception {
|
|
|
+ testReflect(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testSpecific() throws Exception {
|
|
|
+ testSpecific(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testSecureSpecific() throws Exception {
|
|
|
+ testSpecific(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testReflect(boolean secure) throws Exception {
|
|
|
Configuration conf = new Configuration();
|
|
|
+ TestTokenSecretManager sm = null;
|
|
|
+ if (secure) {
|
|
|
+ makeSecure(conf);
|
|
|
+ sm = new TestTokenSecretManager();
|
|
|
+ }
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
|
|
|
Server server = RPC.getServer(AvroTestProtocol.class,
|
|
|
- new TestImpl(), ADDRESS, 0, conf);
|
|
|
- AvroTestProtocol proxy = null;
|
|
|
+ new TestImpl(), ADDRESS, 0, 5, true,
|
|
|
+ conf, sm);
|
|
|
try {
|
|
|
server.start();
|
|
|
-
|
|
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
- proxy =
|
|
|
+
|
|
|
+ if (secure) {
|
|
|
+ addToken(sm, addr);
|
|
|
+ //QOP must be auth
|
|
|
+ Assert.assertEquals("auth", SaslRpcServer.SASL_PROPS.get(Sasl.QOP));
|
|
|
+ }
|
|
|
+
|
|
|
+ AvroTestProtocol proxy =
|
|
|
(AvroTestProtocol)RPC.getProxy(AvroTestProtocol.class, 0, addr, conf);
|
|
|
-
|
|
|
+
|
|
|
proxy.ping();
|
|
|
|
|
|
String echo = proxy.echo("hello world");
|
|
@@ -89,23 +130,62 @@ public class TestAvroRpc extends TestCase {
|
|
|
assertTrue(caught);
|
|
|
|
|
|
} finally {
|
|
|
+ resetSecurity();
|
|
|
server.stop();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testAvroSpecificRpc() throws Exception {
|
|
|
+ private void makeSecure(Configuration conf) {
|
|
|
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
|
|
+ conf.set("hadoop.rpc.socket.factory.class.default", "");
|
|
|
+ //Avro doesn't work with security annotations on protocol.
|
|
|
+ //Avro works ONLY with custom security context
|
|
|
+ SecurityUtil.setSecurityInfoProviders(new CustomSecurityInfo());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void resetSecurity() {
|
|
|
+ SecurityUtil.setSecurityInfoProviders(new SecurityInfo[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addToken(TestTokenSecretManager sm,
|
|
|
+ InetSocketAddress addr) throws IOException {
|
|
|
+ final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
|
|
+
|
|
|
+ TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
|
|
|
+ .getUserName()));
|
|
|
+ Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
|
|
|
+ sm);
|
|
|
+ Text host = new Text(addr.getAddress().getHostAddress() + ":"
|
|
|
+ + addr.getPort());
|
|
|
+ token.setService(host);
|
|
|
+ LOG.info("Service IP address for token is " + host);
|
|
|
+ current.addToken(token);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testSpecific(boolean secure) throws Exception {
|
|
|
Configuration conf = new Configuration();
|
|
|
+ TestTokenSecretManager sm = null;
|
|
|
+ if (secure) {
|
|
|
+ makeSecure(conf);
|
|
|
+ sm = new TestTokenSecretManager();
|
|
|
+ }
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
RPC.setProtocolEngine(conf, AvroSpecificTestProtocol.class,
|
|
|
AvroSpecificRpcEngine.class);
|
|
|
Server server = RPC.getServer(AvroSpecificTestProtocol.class,
|
|
|
- new AvroSpecificTestProtocolImpl(),
|
|
|
- ADDRESS, 0, conf);
|
|
|
- AvroSpecificTestProtocol proxy = null;
|
|
|
+ new AvroSpecificTestProtocolImpl(), ADDRESS, 0, 5, true,
|
|
|
+ conf, sm);
|
|
|
try {
|
|
|
server.start();
|
|
|
-
|
|
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
- proxy =
|
|
|
+
|
|
|
+ if (secure) {
|
|
|
+ addToken(sm, addr);
|
|
|
+ //QOP must be auth
|
|
|
+ Assert.assertEquals("auth", SaslRpcServer.SASL_PROPS.get(Sasl.QOP));
|
|
|
+ }
|
|
|
+
|
|
|
+ AvroSpecificTestProtocol proxy =
|
|
|
(AvroSpecificTestProtocol)RPC.getProxy(AvroSpecificTestProtocol.class,
|
|
|
0, addr, conf);
|
|
|
|
|
@@ -116,6 +196,7 @@ public class TestAvroRpc extends TestCase {
|
|
|
assertEquals(3, intResult);
|
|
|
|
|
|
} finally {
|
|
|
+ resetSecurity();
|
|
|
server.stop();
|
|
|
}
|
|
|
}
|
|
@@ -134,5 +215,5 @@ public class TestAvroRpc extends TestCase {
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|