|
@@ -21,7 +21,6 @@ package org.apache.hadoop.ipc;
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
-import java.io.DataOutput;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.UndeclaredThrowableException;
|
|
@@ -46,7 +45,6 @@ import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
@@ -59,7 +57,6 @@ import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
-import javax.security.auth.callback.CallbackHandler;
|
|
|
import javax.security.sasl.Sasl;
|
|
|
import javax.security.sasl.SaslException;
|
|
|
import javax.security.sasl.SaslServer;
|
|
@@ -71,11 +68,11 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configuration.IntegerRanges;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
-import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
-import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
|
|
+import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
|
|
|
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
|
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
|
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
|
@@ -83,18 +80,15 @@ import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
|
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
|
|
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.*;
|
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.SaslRpcServer;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
|
|
-import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
|
|
|
-import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
|
|
-import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
|
-import org.apache.hadoop.security.authentication.util.KerberosName;
|
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
|
@@ -108,7 +102,9 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.protobuf.ByteString;
|
|
|
import com.google.protobuf.CodedOutputStream;
|
|
|
+import com.google.protobuf.Message;
|
|
|
|
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
@@ -118,7 +114,8 @@ import com.google.protobuf.CodedOutputStream;
|
|
|
*/
|
|
|
public abstract class Server {
|
|
|
private final boolean authorize;
|
|
|
- private EnumSet<AuthMethod> enabledAuthMethods;
|
|
|
+ private List<AuthMethod> enabledAuthMethods;
|
|
|
+ private RpcSaslProto negotiateResponse;
|
|
|
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
|
|
|
|
|
public void addTerseExceptions(Class<?>... exceptionClass) {
|
|
@@ -1062,6 +1059,26 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public static enum AuthProtocol {
|
|
|
+ NONE(0),
|
|
|
+ SASL(-33);
|
|
|
+
|
|
|
+ public final int callId;
|
|
|
+ AuthProtocol(int callId) {
|
|
|
+ this.callId = callId;
|
|
|
+ }
|
|
|
+
|
|
|
+ static AuthProtocol valueOf(int callId) {
|
|
|
+ for (AuthProtocol authType : AuthProtocol.values()) {
|
|
|
+ if (authType.callId == callId) {
|
|
|
+ return authType;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
/** Reads calls from a connection and queues them for handling. */
|
|
|
public class Connection {
|
|
|
private boolean connectionHeaderRead = false; // connection header is read?
|
|
@@ -1086,6 +1103,7 @@ public abstract class Server {
|
|
|
String protocolName;
|
|
|
SaslServer saslServer;
|
|
|
private AuthMethod authMethod;
|
|
|
+ private AuthProtocol authProtocol;
|
|
|
private boolean saslContextEstablished;
|
|
|
private boolean skipInitialSaslHandshake;
|
|
|
private ByteBuffer connectionHeaderBuf = null;
|
|
@@ -1101,12 +1119,11 @@ public abstract class Server {
|
|
|
private final Call authFailedCall =
|
|
|
new Call(AUTHORIZATION_FAILED_CALLID, null, this);
|
|
|
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
|
|
|
- // Fake 'call' for SASL context setup
|
|
|
- private static final int SASL_CALLID = -33;
|
|
|
|
|
|
- private final Call saslCall = new Call(SASL_CALLID, null, this);
|
|
|
+ private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this);
|
|
|
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
|
|
|
|
|
|
+ private boolean sentNegotiate = false;
|
|
|
private boolean useWrap = false;
|
|
|
|
|
|
public Connection(SelectionKey key, SocketChannel channel,
|
|
@@ -1180,7 +1197,7 @@ public abstract class Server {
|
|
|
|
|
|
private UserGroupInformation getAuthorizedUgi(String authorizedId)
|
|
|
throws IOException {
|
|
|
- if (authMethod == SaslRpcServer.AuthMethod.DIGEST) {
|
|
|
+ if (authMethod == AuthMethod.TOKEN) {
|
|
|
TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,
|
|
|
secretManager);
|
|
|
UserGroupInformation ugi = tokenId.getUser();
|
|
@@ -1198,12 +1215,9 @@ public abstract class Server {
|
|
|
private void saslReadAndProcess(byte[] saslToken) throws IOException,
|
|
|
InterruptedException {
|
|
|
if (!saslContextEstablished) {
|
|
|
- byte[] replyToken = null;
|
|
|
+ RpcSaslProto saslResponse;
|
|
|
try {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Have read input token of size " + saslToken.length
|
|
|
- + " for processing by saslServer.evaluateResponse()");
|
|
|
- replyToken = saslServer.evaluateResponse(saslToken);
|
|
|
+ saslResponse = processSaslMessage(saslToken);
|
|
|
} catch (IOException e) {
|
|
|
IOException sendToClient = e;
|
|
|
Throwable cause = e;
|
|
@@ -1214,27 +1228,17 @@ public abstract class Server {
|
|
|
}
|
|
|
cause = cause.getCause();
|
|
|
}
|
|
|
- doSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
|
|
|
- sendToClient.getLocalizedMessage());
|
|
|
rpcMetrics.incrAuthenticationFailures();
|
|
|
String clientIP = this.toString();
|
|
|
// attempting user could be null
|
|
|
AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser +
|
|
|
" (" + e.getLocalizedMessage() + ")");
|
|
|
+ // wait to send response until failure is logged
|
|
|
+ doSaslReply(sendToClient);
|
|
|
throw e;
|
|
|
}
|
|
|
- if (saslServer.isComplete() && replyToken == null) {
|
|
|
- // send final response for success
|
|
|
- replyToken = new byte[0];
|
|
|
- }
|
|
|
- if (replyToken != null) {
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Will send token of size " + replyToken.length
|
|
|
- + " from saslServer.");
|
|
|
- doSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
|
|
|
- null);
|
|
|
- }
|
|
|
- if (saslServer.isComplete()) {
|
|
|
+
|
|
|
+ if (saslServer != null && saslServer.isComplete()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("SASL server context established. Negotiated QoP is "
|
|
|
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
|
@@ -1249,6 +1253,9 @@ public abstract class Server {
|
|
|
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
|
|
|
saslContextEstablished = true;
|
|
|
}
|
|
|
+ // send reply here to avoid a successful auth being logged as a
|
|
|
+ // failure if response can't be sent
|
|
|
+ doSaslReply(saslResponse);
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled())
|
|
|
LOG.debug("Have read input token of size " + saslToken.length
|
|
@@ -1264,21 +1271,101 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void doSaslReply(SaslStatus status, Writable rv,
|
|
|
- String errorClass, String error) throws IOException {
|
|
|
- saslResponse.reset();
|
|
|
- DataOutputStream out = new DataOutputStream(saslResponse);
|
|
|
- out.writeInt(status.state); // write status
|
|
|
- if (status == SaslStatus.SUCCESS) {
|
|
|
- rv.write(out);
|
|
|
- } else {
|
|
|
- WritableUtils.writeString(out, errorClass);
|
|
|
- WritableUtils.writeString(out, error);
|
|
|
+ private RpcSaslProto processSaslMessage(byte[] buf)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ final DataInputStream dis =
|
|
|
+ new DataInputStream(new ByteArrayInputStream(buf));
|
|
|
+ RpcRequestMessageWrapper requestWrapper = new RpcRequestMessageWrapper();
|
|
|
+ requestWrapper.readFields(dis);
|
|
|
+
|
|
|
+ final RpcRequestHeaderProto rpcHeader = requestWrapper.requestHeader;
|
|
|
+ if (rpcHeader.getCallId() != AuthProtocol.SASL.callId) {
|
|
|
+ throw new SaslException("Client sent non-SASL request");
|
|
|
+ }
|
|
|
+ final RpcSaslProto saslMessage =
|
|
|
+ RpcSaslProto.parseFrom(requestWrapper.theRequestRead);
|
|
|
+ RpcSaslProto saslResponse = null;
|
|
|
+ final SaslState state = saslMessage.getState(); // required
|
|
|
+ switch (state) {
|
|
|
+ case NEGOTIATE: {
|
|
|
+ if (sentNegotiate) {
|
|
|
+ throw new AccessControlException(
|
|
|
+ "Client already attempted negotiation");
|
|
|
+ }
|
|
|
+ saslResponse = buildSaslNegotiateResponse();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case INITIATE: {
|
|
|
+ if (saslMessage.getAuthsCount() != 1) {
|
|
|
+ throw new SaslException("Client mechanism is malformed");
|
|
|
+ }
|
|
|
+ String authMethodName = saslMessage.getAuths(0).getMethod();
|
|
|
+ authMethod = createSaslServer(authMethodName);
|
|
|
+ if (authMethod == null) { // the auth method is not supported
|
|
|
+ if (sentNegotiate) {
|
|
|
+ throw new AccessControlException(
|
|
|
+ authMethodName + " authentication is not enabled."
|
|
|
+ + " Available:" + enabledAuthMethods);
|
|
|
+ }
|
|
|
+ saslResponse = buildSaslNegotiateResponse();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ // fallthru to process sasl token
|
|
|
+ }
|
|
|
+ case RESPONSE: {
|
|
|
+ if (!saslMessage.hasToken()) {
|
|
|
+ throw new SaslException("Client did not send a token");
|
|
|
+ }
|
|
|
+ byte[] saslToken = saslMessage.getToken().toByteArray();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Have read input token of size " + saslToken.length
|
|
|
+ + " for processing by saslServer.evaluateResponse()");
|
|
|
+ }
|
|
|
+ saslToken = saslServer.evaluateResponse(saslToken);
|
|
|
+ saslResponse = buildSaslResponse(
|
|
|
+ saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
|
|
|
+ saslToken);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ throw new SaslException("Client sent unsupported state " + state);
|
|
|
+ }
|
|
|
+ return saslResponse;
|
|
|
+ }
|
|
|
+
|
|
|
+ private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken)
|
|
|
+ throws IOException {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Will send " + state + " token of size "
|
|
|
+ + ((replyToken != null) ? replyToken.length : null)
|
|
|
+ + " from saslServer.");
|
|
|
+ }
|
|
|
+ RpcSaslProto.Builder response = RpcSaslProto.newBuilder();
|
|
|
+ response.setState(state);
|
|
|
+ if (replyToken != null) {
|
|
|
+ response.setToken(ByteString.copyFrom(replyToken));
|
|
|
+ }
|
|
|
+ return response.build();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doSaslReply(Message message)
|
|
|
+ throws IOException {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Sending sasl message "+message);
|
|
|
}
|
|
|
- saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
|
|
|
+ setupResponse(saslResponse, saslCall,
|
|
|
+ RpcStatusProto.SUCCESS, null,
|
|
|
+ new RpcResponseWrapper(message), null, null);
|
|
|
responder.doRespond(saslCall);
|
|
|
}
|
|
|
|
|
|
+ private void doSaslReply(Exception ioe) throws IOException {
|
|
|
+ setupResponse(authFailedResponse, authFailedCall,
|
|
|
+ RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
|
|
|
+ null, ioe.getClass().getName(), ioe.getLocalizedMessage());
|
|
|
+ responder.doRespond(authFailedCall);
|
|
|
+ }
|
|
|
+
|
|
|
private void disposeSasl() {
|
|
|
if (saslServer != null) {
|
|
|
try {
|
|
@@ -1312,10 +1399,6 @@ public abstract class Server {
|
|
|
int version = connectionHeaderBuf.get(0);
|
|
|
// TODO we should add handler for service class later
|
|
|
this.setServiceClass(connectionHeaderBuf.get(1));
|
|
|
-
|
|
|
- byte[] method = new byte[] {connectionHeaderBuf.get(2)};
|
|
|
- authMethod = AuthMethod.read(new DataInputStream(
|
|
|
- new ByteArrayInputStream(method)));
|
|
|
dataLengthBuffer.flip();
|
|
|
|
|
|
// Check if it looks like the user is hitting an IPC port
|
|
@@ -1336,14 +1419,10 @@ public abstract class Server {
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- dataLengthBuffer.clear();
|
|
|
- if (authMethod == null) {
|
|
|
- throw new IOException("Unable to read authentication method");
|
|
|
- }
|
|
|
-
|
|
|
- // this may create a SASL server, or switch us into SIMPLE
|
|
|
- authMethod = initializeAuthContext(authMethod);
|
|
|
+ // this may switch us into SIMPLE
|
|
|
+ authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
|
|
|
|
|
|
+ dataLengthBuffer.clear();
|
|
|
connectionHeaderBuf = null;
|
|
|
connectionHeaderRead = true;
|
|
|
continue;
|
|
@@ -1370,14 +1449,14 @@ public abstract class Server {
|
|
|
if (data.remaining() == 0) {
|
|
|
dataLengthBuffer.clear();
|
|
|
data.flip();
|
|
|
- if (skipInitialSaslHandshake) {
|
|
|
- data = null;
|
|
|
- skipInitialSaslHandshake = false;
|
|
|
- continue;
|
|
|
- }
|
|
|
boolean isHeaderRead = connectionContextRead;
|
|
|
- if (saslServer != null) {
|
|
|
- saslReadAndProcess(data.array());
|
|
|
+ if (authProtocol == AuthProtocol.SASL) {
|
|
|
+ // switch to simple must ignore next negotiate or initiate
|
|
|
+ if (skipInitialSaslHandshake) {
|
|
|
+ authProtocol = AuthProtocol.NONE;
|
|
|
+ } else {
|
|
|
+ saslReadAndProcess(data.array());
|
|
|
+ }
|
|
|
} else {
|
|
|
processOneRpc(data.array());
|
|
|
}
|
|
@@ -1390,102 +1469,79 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private AuthMethod initializeAuthContext(AuthMethod authMethod)
|
|
|
+ private AuthProtocol initializeAuthContext(int authType)
|
|
|
throws IOException, InterruptedException {
|
|
|
- try {
|
|
|
- if (enabledAuthMethods.contains(authMethod)) {
|
|
|
- saslServer = createSaslServer(authMethod);
|
|
|
- } else if (enabledAuthMethods.contains(AuthMethod.SIMPLE)) {
|
|
|
- doSaslReply(SaslStatus.SUCCESS, new IntWritable(
|
|
|
- SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
|
|
|
- authMethod = AuthMethod.SIMPLE;
|
|
|
- // client has already sent the initial Sasl message and we
|
|
|
- // should ignore it. Both client and server should fall back
|
|
|
- // to simple auth from now on.
|
|
|
- skipInitialSaslHandshake = true;
|
|
|
- } else {
|
|
|
- throw new AccessControlException(
|
|
|
- authMethod + " authentication is not enabled."
|
|
|
- + " Available:" + enabledAuthMethods);
|
|
|
- }
|
|
|
- } catch (IOException ioe) {
|
|
|
- final String ioeClass = ioe.getClass().getName();
|
|
|
- final String ioeMessage = ioe.getLocalizedMessage();
|
|
|
- if (authMethod == AuthMethod.SIMPLE) {
|
|
|
- setupResponse(authFailedResponse, authFailedCall,
|
|
|
- RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
|
|
|
- null, ioeClass, ioeMessage);
|
|
|
- responder.doRespond(authFailedCall);
|
|
|
- } else {
|
|
|
- doSaslReply(SaslStatus.ERROR, null, ioeClass, ioeMessage);
|
|
|
- }
|
|
|
- throw ioe;
|
|
|
+ AuthProtocol authProtocol = AuthProtocol.valueOf(authType);
|
|
|
+ if (authProtocol == null) {
|
|
|
+ IOException ioe = new IpcException("Unknown auth protocol:" + authType);
|
|
|
+ doSaslReply(ioe);
|
|
|
+ throw ioe;
|
|
|
}
|
|
|
- return authMethod;
|
|
|
- }
|
|
|
-
|
|
|
- private SaslServer createSaslServer(AuthMethod authMethod)
|
|
|
- throws IOException, InterruptedException {
|
|
|
- String hostname = null;
|
|
|
- String saslProtocol = null;
|
|
|
- CallbackHandler saslCallback = null;
|
|
|
-
|
|
|
- switch (authMethod) {
|
|
|
- case SIMPLE: {
|
|
|
- return null; // no sasl for simple
|
|
|
- }
|
|
|
- case DIGEST: {
|
|
|
- secretManager.checkAvailableForRead();
|
|
|
- hostname = SaslRpcServer.SASL_DEFAULT_REALM;
|
|
|
- saslCallback = new SaslDigestCallbackHandler(secretManager, this);
|
|
|
+ boolean isSimpleEnabled = enabledAuthMethods.contains(AuthMethod.SIMPLE);
|
|
|
+ switch (authProtocol) {
|
|
|
+ case NONE: {
|
|
|
+ // don't reply if client is simple and server is insecure
|
|
|
+ if (!isSimpleEnabled) {
|
|
|
+ IOException ioe = new AccessControlException(
|
|
|
+ "SIMPLE authentication is not enabled."
|
|
|
+ + " Available:" + enabledAuthMethods);
|
|
|
+ doSaslReply(ioe);
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
- case KERBEROS: {
|
|
|
- String fullName = UserGroupInformation.getCurrentUser().getUserName();
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Kerberos principal name is " + fullName);
|
|
|
- KerberosName krbName = new KerberosName(fullName);
|
|
|
- hostname = krbName.getHostName();
|
|
|
- if (hostname == null) {
|
|
|
- throw new AccessControlException(
|
|
|
- "Kerberos principal name does NOT have the expected "
|
|
|
- + "hostname part: " + fullName);
|
|
|
+ case SASL: {
|
|
|
+ if (isSimpleEnabled) { // switch to simple hack
|
|
|
+ skipInitialSaslHandshake = true;
|
|
|
+ doSaslReply(buildSaslResponse(SaslState.SUCCESS, null));
|
|
|
}
|
|
|
- saslProtocol = krbName.getServiceName();
|
|
|
- saslCallback = new SaslGssCallbackHandler();
|
|
|
+ // else wait for a negotiate or initiate
|
|
|
break;
|
|
|
}
|
|
|
- default:
|
|
|
- // we should never be able to get here
|
|
|
- throw new AccessControlException(
|
|
|
- "Server does not support SASL " + authMethod);
|
|
|
}
|
|
|
-
|
|
|
- return createSaslServer(authMethod.getMechanismName(), saslProtocol,
|
|
|
- hostname, saslCallback);
|
|
|
- }
|
|
|
-
|
|
|
- private SaslServer createSaslServer(final String mechanism,
|
|
|
- final String protocol,
|
|
|
- final String hostname,
|
|
|
- final CallbackHandler callback
|
|
|
- ) throws IOException, InterruptedException {
|
|
|
- SaslServer saslServer = UserGroupInformation.getCurrentUser().doAs(
|
|
|
- new PrivilegedExceptionAction<SaslServer>() {
|
|
|
- @Override
|
|
|
- public SaslServer run() throws SaslException {
|
|
|
- return Sasl.createSaslServer(mechanism, protocol, hostname,
|
|
|
- SaslRpcServer.SASL_PROPS, callback);
|
|
|
- }
|
|
|
- });
|
|
|
- if (saslServer == null) {
|
|
|
- throw new AccessControlException(
|
|
|
- "Unable to find SASL server implementation for " + mechanism);
|
|
|
+ return authProtocol;
|
|
|
+ }
|
|
|
+
|
|
|
+ private RpcSaslProto buildSaslNegotiateResponse()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ RpcSaslProto negotiateMessage = negotiateResponse;
|
|
|
+ // accelerate token negotiation by sending initial challenge
|
|
|
+ // in the negotiation response
|
|
|
+ if (enabledAuthMethods.contains(AuthMethod.TOKEN)) {
|
|
|
+ saslServer = createSaslServer(AuthMethod.TOKEN);
|
|
|
+ byte[] challenge = saslServer.evaluateResponse(new byte[0]);
|
|
|
+ RpcSaslProto.Builder negotiateBuilder =
|
|
|
+ RpcSaslProto.newBuilder(negotiateResponse);
|
|
|
+ negotiateBuilder.getAuthsBuilder(0) // TOKEN is always first
|
|
|
+ .setChallenge(ByteString.copyFrom(challenge));
|
|
|
+ negotiateMessage = negotiateBuilder.build();
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Created SASL server with mechanism = " + mechanism);
|
|
|
+ sentNegotiate = true;
|
|
|
+ return negotiateMessage;
|
|
|
+ }
|
|
|
+
|
|
|
+ private AuthMethod createSaslServer(String authMethodName)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ AuthMethod authMethod;
|
|
|
+ try {
|
|
|
+ authMethod = AuthMethod.valueOf(authMethodName);
|
|
|
+ if (!enabledAuthMethods.contains(authMethod)) {
|
|
|
+ authMethod = null;
|
|
|
+ }
|
|
|
+ } catch (IllegalArgumentException iae) {
|
|
|
+ authMethod = null;
|
|
|
+ }
|
|
|
+ if (authMethod != null &&
|
|
|
+ // sasl server for tokens may already be instantiated
|
|
|
+ (saslServer == null || authMethod != AuthMethod.TOKEN)) {
|
|
|
+ saslServer = createSaslServer(authMethod);
|
|
|
}
|
|
|
- return saslServer;
|
|
|
+ return authMethod;
|
|
|
+ }
|
|
|
+
|
|
|
+ private SaslServer createSaslServer(AuthMethod authMethod)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ return new SaslRpcServer(authMethod).create(this, secretManager);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1554,7 +1610,7 @@ public abstract class Server {
|
|
|
//this is not allowed if user authenticated with DIGEST.
|
|
|
if ((protocolUser != null)
|
|
|
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
|
|
|
- if (authMethod == AuthMethod.DIGEST) {
|
|
|
+ if (authMethod == AuthMethod.TOKEN) {
|
|
|
// Not allowed to doAs if token authentication is used
|
|
|
throw new AccessControlException("Authenticated user (" + user
|
|
|
+ ") doesn't match what the client claims to be ("
|
|
@@ -1710,7 +1766,7 @@ public abstract class Server {
|
|
|
// authorize real user. doAs is allowed only for simple or kerberos
|
|
|
// authentication
|
|
|
if (user != null && user.getRealUser() != null
|
|
|
- && (authMethod != AuthMethod.DIGEST)) {
|
|
|
+ && (authMethod != AuthMethod.TOKEN)) {
|
|
|
ProxyUsers.authorize(user, this.getHostAddress(), conf);
|
|
|
}
|
|
|
authorize(user, protocolName, getHostInetAddress());
|
|
@@ -1951,6 +2007,7 @@ public abstract class Server {
|
|
|
|
|
|
// configure supported authentications
|
|
|
this.enabledAuthMethods = getAuthMethods(secretManager, conf);
|
|
|
+ this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
|
|
|
|
|
|
// Start the listener here and let it bind to the port
|
|
|
listener = new Listener();
|
|
@@ -1970,17 +2027,33 @@ public abstract class Server {
|
|
|
|
|
|
this.exceptionsHandler.addTerseExceptions(StandbyException.class);
|
|
|
}
|
|
|
+
|
|
|
+ private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
|
|
|
+ throws IOException {
|
|
|
+ RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
|
|
|
+ negotiateBuilder.setState(SaslState.NEGOTIATE);
|
|
|
+ for (AuthMethod authMethod : authMethods) {
|
|
|
+ if (authMethod == AuthMethod.SIMPLE) { // not a SASL method
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);
|
|
|
+ negotiateBuilder.addAuthsBuilder()
|
|
|
+ .setMethod(authMethod.toString())
|
|
|
+ .setMechanism(saslRpcServer.mechanism)
|
|
|
+ .setProtocol(saslRpcServer.protocol)
|
|
|
+ .setServerId(saslRpcServer.serverId);
|
|
|
+ }
|
|
|
+ return negotiateBuilder.build();
|
|
|
+ }
|
|
|
|
|
|
// get the security type from the conf. implicitly include token support
|
|
|
// if a secret manager is provided, or fail if token is the conf value but
|
|
|
// there is no secret manager
|
|
|
- private EnumSet<AuthMethod> getAuthMethods(SecretManager<?> secretManager,
|
|
|
+ private List<AuthMethod> getAuthMethods(SecretManager<?> secretManager,
|
|
|
Configuration conf) {
|
|
|
AuthenticationMethod confAuthenticationMethod =
|
|
|
SecurityUtil.getAuthenticationMethod(conf);
|
|
|
- EnumSet<AuthMethod> authMethods =
|
|
|
- EnumSet.of(confAuthenticationMethod.getAuthMethod());
|
|
|
-
|
|
|
+ List<AuthMethod> authMethods = new ArrayList<AuthMethod>();
|
|
|
if (confAuthenticationMethod == AuthenticationMethod.TOKEN) {
|
|
|
if (secretManager == null) {
|
|
|
throw new IllegalArgumentException(AuthenticationMethod.TOKEN +
|
|
@@ -1989,8 +2062,10 @@ public abstract class Server {
|
|
|
} else if (secretManager != null) {
|
|
|
LOG.debug(AuthenticationMethod.TOKEN +
|
|
|
" authentication enabled for secret manager");
|
|
|
+ // most preferred, go to the front of the line!
|
|
|
authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod());
|
|
|
}
|
|
|
+ authMethods.add(confAuthenticationMethod.getAuthMethod());
|
|
|
|
|
|
LOG.debug("Server accepts auth methods:" + authMethods);
|
|
|
return authMethods;
|