|
@@ -60,12 +60,15 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
+import org.apache.hadoop.io.BytesWritable;
|
|
|
+import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.SaslRpcServer;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
|
|
+import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -74,6 +77,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.security.token.SecretManager;
|
|
|
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
@@ -757,11 +761,11 @@ public abstract class Server {
|
|
|
// Fake 'call' for failed authorization response
|
|
|
private static final int AUTHROIZATION_FAILED_CALLID = -1;
|
|
|
private final Call authFailedCall =
|
|
|
- new Call(AUTHROIZATION_FAILED_CALLID, null, null);
|
|
|
+ new Call(AUTHROIZATION_FAILED_CALLID, null, this);
|
|
|
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
|
|
|
// Fake 'call' for SASL context setup
|
|
|
private static final int SASL_CALLID = -33;
|
|
|
- private final Call saslCall = new Call(SASL_CALLID, null, null);
|
|
|
+ private final Call saslCall = new Call(SASL_CALLID, null, this);
|
|
|
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
|
|
|
|
|
|
public Connection(SelectionKey key, SocketChannel channel,
|
|
@@ -843,68 +847,78 @@ public abstract class Server {
|
|
|
private void saslReadAndProcess(byte[] saslToken) throws IOException,
|
|
|
InterruptedException {
|
|
|
if (!saslContextEstablished) {
|
|
|
- if (saslServer == null) {
|
|
|
- switch (authMethod) {
|
|
|
- case DIGEST:
|
|
|
- saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
|
|
- .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
|
|
|
- SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
|
|
|
- secretManager, this));
|
|
|
- break;
|
|
|
- default:
|
|
|
- UserGroupInformation current = UserGroupInformation
|
|
|
- .getCurrentUser();
|
|
|
- String fullName = current.getUserName();
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Kerberos principal name is " + fullName);
|
|
|
- final String names[] = SaslRpcServer.splitKerberosName(fullName);
|
|
|
- if (names.length != 3) {
|
|
|
- throw new IOException(
|
|
|
- "Kerberos principal name does NOT have the expected "
|
|
|
- + "hostname part: " + fullName);
|
|
|
- }
|
|
|
- current.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
- @Override
|
|
|
- public Object run() throws IOException {
|
|
|
- saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
|
|
|
- .getMechanismName(), names[0], names[1],
|
|
|
- SaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
|
|
|
- return null;
|
|
|
+ byte[] replyToken = null;
|
|
|
+ try {
|
|
|
+ if (saslServer == null) {
|
|
|
+ switch (authMethod) {
|
|
|
+ case DIGEST:
|
|
|
+ if (secretManager == null) {
|
|
|
+ throw new AccessControlException(
|
|
|
+ "Server is not configured to do DIGEST authentication.");
|
|
|
}
|
|
|
- });
|
|
|
+ saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
|
|
+ .getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
|
|
|
+ SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
|
|
|
+ secretManager, this));
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ UserGroupInformation current = UserGroupInformation
|
|
|
+ .getCurrentUser();
|
|
|
+ String fullName = current.getUserName();
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug("Kerberos principal name is " + fullName);
|
|
|
+ final String names[] = SaslRpcServer.splitKerberosName(fullName);
|
|
|
+ if (names.length != 3) {
|
|
|
+ throw new AccessControlException(
|
|
|
+ "Kerberos principal name does NOT have the expected "
|
|
|
+ + "hostname part: " + fullName);
|
|
|
+ }
|
|
|
+ current.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object run() throws SaslException {
|
|
|
+ saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
|
|
|
+ .getMechanismName(), names[0], names[1],
|
|
|
+ SaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if (saslServer == null)
|
|
|
+ throw new AccessControlException(
|
|
|
+ "Unable to find SASL server implementation for "
|
|
|
+ + authMethod.getMechanismName());
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug("Created SASL server with mechanism = "
|
|
|
+ + authMethod.getMechanismName());
|
|
|
}
|
|
|
- if (saslServer == null)
|
|
|
- throw new IOException(
|
|
|
- "Unable to find SASL server implementation for "
|
|
|
- + authMethod.getMechanismName());
|
|
|
if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Created SASL server with mechanism = "
|
|
|
- + authMethod.getMechanismName());
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled())
|
|
|
- LOG.debug("Have read input token of size " + saslToken.length
|
|
|
- + " for processing by saslServer.evaluateResponse()");
|
|
|
- byte[] replyToken;
|
|
|
- try {
|
|
|
+ LOG.debug("Have read input token of size " + saslToken.length
|
|
|
+ + " for processing by saslServer.evaluateResponse()");
|
|
|
replyToken = saslServer.evaluateResponse(saslToken);
|
|
|
- } catch (SaslException se) {
|
|
|
+ } catch (IOException e) {
|
|
|
+ IOException sendToClient = e;
|
|
|
+ Throwable cause = e;
|
|
|
+ while (cause != null) {
|
|
|
+ if (cause instanceof InvalidToken) {
|
|
|
+ sendToClient = (InvalidToken) cause;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ cause = cause.getCause();
|
|
|
+ }
|
|
|
+ doSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
|
|
|
+ sendToClient.getLocalizedMessage());
|
|
|
rpcMetrics.authenticationFailures.inc();
|
|
|
String clientIP = this.toString();
|
|
|
// attempting user could be null
|
|
|
- auditLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser, se);
|
|
|
- throw se;
|
|
|
+ auditLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser, e);
|
|
|
+ throw e;
|
|
|
}
|
|
|
if (replyToken != null) {
|
|
|
if (LOG.isDebugEnabled())
|
|
|
LOG.debug("Will send token of size " + replyToken.length
|
|
|
+ " from saslServer.");
|
|
|
- saslCall.connection = this;
|
|
|
- saslResponse.reset();
|
|
|
- DataOutputStream out = new DataOutputStream(saslResponse);
|
|
|
- out.writeInt(replyToken.length);
|
|
|
- out.write(replyToken, 0, replyToken.length);
|
|
|
- saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
|
|
|
- responder.doRespond(saslCall);
|
|
|
+ doSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
|
|
|
+ null);
|
|
|
}
|
|
|
if (saslServer.isComplete()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -927,6 +941,21 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void doSaslReply(SaslStatus status, Writable rv,
|
|
|
+ String errorClass, String error) throws IOException {
|
|
|
+ saslResponse.reset();
|
|
|
+ DataOutputStream out = new DataOutputStream(saslResponse);
|
|
|
+ out.writeInt(status.state); // write status
|
|
|
+ if (status == SaslStatus.SUCCESS) {
|
|
|
+ rv.write(out);
|
|
|
+ } else {
|
|
|
+ WritableUtils.writeString(out, errorClass);
|
|
|
+ WritableUtils.writeString(out, error);
|
|
|
+ }
|
|
|
+ saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
|
|
|
+ responder.doRespond(saslCall);
|
|
|
+ }
|
|
|
+
|
|
|
private void disposeSasl() {
|
|
|
if (saslServer != null) {
|
|
|
try {
|
|
@@ -936,15 +965,6 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void askClientToUseSimpleAuth() throws IOException {
|
|
|
- saslCall.connection = this;
|
|
|
- saslResponse.reset();
|
|
|
- DataOutputStream out = new DataOutputStream(saslResponse);
|
|
|
- out.writeInt(SaslRpcServer.SWITCH_TO_SIMPLE_AUTH);
|
|
|
- saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
|
|
|
- responder.doRespond(saslCall);
|
|
|
- }
|
|
|
-
|
|
|
public int readAndProcess() throws IOException, InterruptedException {
|
|
|
while (true) {
|
|
|
/* Read at most one RPC. If the header is not read completely yet
|
|
@@ -974,10 +994,16 @@ public abstract class Server {
|
|
|
throw new IOException("Unable to read authentication method");
|
|
|
}
|
|
|
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
|
|
|
- throw new IOException("Authentication is required");
|
|
|
+ AccessControlException ae = new AccessControlException(
|
|
|
+ "Authentication is required");
|
|
|
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
|
|
|
+ null, ae.getClass().getName(), ae.getMessage());
|
|
|
+ responder.doRespond(authFailedCall);
|
|
|
+ throw ae;
|
|
|
}
|
|
|
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
|
|
|
- askClientToUseSimpleAuth();
|
|
|
+ doSaslReply(SaslStatus.SUCCESS, new IntWritable(
|
|
|
+ SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
|
|
|
authMethod = AuthMethod.SIMPLE;
|
|
|
// client has already sent the initial Sasl message and we
|
|
|
// should ignore it. Both client and server should fall back
|
|
@@ -1159,7 +1185,6 @@ public abstract class Server {
|
|
|
rpcMetrics.authorizationSuccesses.inc();
|
|
|
} catch (AuthorizationException ae) {
|
|
|
rpcMetrics.authorizationFailures.inc();
|
|
|
- authFailedCall.connection = this;
|
|
|
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
|
|
|
ae.getClass().getName(), ae.getMessage());
|
|
|
responder.doRespond(authFailedCall);
|
|
@@ -1387,6 +1412,11 @@ public abstract class Server {
|
|
|
this.isSecurityEnabled = false;
|
|
|
}
|
|
|
|
|
|
+ /** for unit testing only, should be called before server is started */
|
|
|
+ void enableSecurity() {
|
|
|
+ this.isSecurityEnabled = true;
|
|
|
+ }
|
|
|
+
|
|
|
/** Sets the socket buffer size used for responding to RPCs */
|
|
|
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
|
|
|
|