|
@@ -32,6 +32,7 @@ import java.net.InetSocketAddress;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Locale;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
import javax.security.auth.callback.Callback;
|
|
import javax.security.auth.callback.Callback;
|
|
@@ -81,6 +82,7 @@ import com.google.re2j.Pattern;
|
|
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
|
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
|
@InterfaceStability.Evolving
|
|
@InterfaceStability.Evolving
|
|
public class SaslRpcClient {
|
|
public class SaslRpcClient {
|
|
|
|
+ // This log is public as it is referenced in tests
|
|
public static final Log LOG = LogFactory.getLog(SaslRpcClient.class);
|
|
public static final Log LOG = LogFactory.getLog(SaslRpcClient.class);
|
|
|
|
|
|
private final UserGroupInformation ugi;
|
|
private final UserGroupInformation ugi;
|
|
@@ -98,7 +100,7 @@ public class SaslRpcClient {
|
|
RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
|
|
RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
|
|
private static final RpcSaslProto negotiateRequest =
|
|
private static final RpcSaslProto negotiateRequest =
|
|
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
|
|
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Create a SaslRpcClient that can be used by a RPC client to negotiate
|
|
* Create a SaslRpcClient that can be used by a RPC client to negotiate
|
|
* SASL authentication with a RPC server
|
|
* SASL authentication with a RPC server
|
|
@@ -121,7 +123,6 @@ public class SaslRpcClient {
|
|
public Object getNegotiatedProperty(String key) {
|
|
public Object getNegotiatedProperty(String key) {
|
|
return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
|
|
return (saslClient != null) ? saslClient.getNegotiatedProperty(key) : null;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
|
|
// the RPC Client has an inelegant way of handling expiration of TGTs
|
|
// the RPC Client has an inelegant way of handling expiration of TGTs
|
|
// acquired via a keytab. any connection failure causes a relogin, so
|
|
// acquired via a keytab. any connection failure causes a relogin, so
|
|
@@ -133,7 +134,7 @@ public class SaslRpcClient {
|
|
public AuthMethod getAuthMethod() {
|
|
public AuthMethod getAuthMethod() {
|
|
return authMethod;
|
|
return authMethod;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Instantiate a sasl client for the first supported auth type in the
|
|
* Instantiate a sasl client for the first supported auth type in the
|
|
* given list. The auth type must be defined, enabled, and the user
|
|
* given list. The auth type must be defined, enabled, and the user
|
|
@@ -172,13 +173,12 @@ public class SaslRpcClient {
|
|
throw new AccessControlException(
|
|
throw new AccessControlException(
|
|
"Client cannot authenticate via:" + serverAuthMethods);
|
|
"Client cannot authenticate via:" + serverAuthMethods);
|
|
}
|
|
}
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
|
|
+ if (LOG.isDebugEnabled() && selectedAuthType != null) {
|
|
LOG.debug("Use " + selectedAuthType.getMethod() +
|
|
LOG.debug("Use " + selectedAuthType.getMethod() +
|
|
" authentication for protocol " + protocol.getSimpleName());
|
|
" authentication for protocol " + protocol.getSimpleName());
|
|
}
|
|
}
|
|
return selectedAuthType;
|
|
return selectedAuthType;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
|
|
private boolean isValidAuthType(SaslAuth authType) {
|
|
private boolean isValidAuthType(SaslAuth authType) {
|
|
AuthMethod authMethod;
|
|
AuthMethod authMethod;
|
|
@@ -190,8 +190,8 @@ public class SaslRpcClient {
|
|
// do we know what it is? is it using our mechanism?
|
|
// do we know what it is? is it using our mechanism?
|
|
return authMethod != null &&
|
|
return authMethod != null &&
|
|
authMethod.getMechanismName().equals(authType.getMechanism());
|
|
authMethod.getMechanismName().equals(authType.getMechanism());
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Try to create a SaslClient for an authentication type. May return
|
|
* Try to create a SaslClient for an authentication type. May return
|
|
* null if the type isn't supported or the client lacks the required
|
|
* null if the type isn't supported or the client lacks the required
|
|
@@ -218,7 +218,9 @@ public class SaslRpcClient {
|
|
case TOKEN: {
|
|
case TOKEN: {
|
|
Token<?> token = getServerToken(authType);
|
|
Token<?> token = getServerToken(authType);
|
|
if (token == null) {
|
|
if (token == null) {
|
|
- return null; // tokens aren't supported or user doesn't have one
|
|
|
|
|
|
+ LOG.debug("tokens aren't supported for this protocol" +
|
|
|
|
+ " or user doesn't have one");
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
saslCallback = new SaslClientCallbackHandler(token);
|
|
saslCallback = new SaslClientCallbackHandler(token);
|
|
break;
|
|
break;
|
|
@@ -226,11 +228,13 @@ public class SaslRpcClient {
|
|
case KERBEROS: {
|
|
case KERBEROS: {
|
|
if (ugi.getRealAuthenticationMethod().getAuthMethod() !=
|
|
if (ugi.getRealAuthenticationMethod().getAuthMethod() !=
|
|
AuthMethod.KERBEROS) {
|
|
AuthMethod.KERBEROS) {
|
|
- return null; // client isn't using kerberos
|
|
|
|
|
|
+ LOG.debug("client isn't using kerberos");
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
String serverPrincipal = getServerPrincipal(authType);
|
|
String serverPrincipal = getServerPrincipal(authType);
|
|
if (serverPrincipal == null) {
|
|
if (serverPrincipal == null) {
|
|
- return null; // protocol doesn't use kerberos
|
|
|
|
|
|
+ LOG.debug("protocol doesn't use kerberos");
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("RPC Server's Kerberos principal name for protocol="
|
|
LOG.debug("RPC Server's Kerberos principal name for protocol="
|
|
@@ -241,7 +245,7 @@ public class SaslRpcClient {
|
|
default:
|
|
default:
|
|
throw new IOException("Unknown authentication method " + method);
|
|
throw new IOException("Unknown authentication method " + method);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
String mechanism = method.getMechanismName();
|
|
String mechanism = method.getMechanismName();
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Creating SASL " + mechanism + "(" + method + ") "
|
|
LOG.debug("Creating SASL " + mechanism + "(" + method + ") "
|
|
@@ -251,32 +255,30 @@ public class SaslRpcClient {
|
|
new String[] { mechanism }, saslUser, saslProtocol, saslServerName,
|
|
new String[] { mechanism }, saslUser, saslProtocol, saslServerName,
|
|
saslProperties, saslCallback);
|
|
saslProperties, saslCallback);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Try to locate the required token for the server.
|
|
* Try to locate the required token for the server.
|
|
*
|
|
*
|
|
* @param authType of the SASL client
|
|
* @param authType of the SASL client
|
|
- * @return Token<?> for server, or null if no token available
|
|
|
|
|
|
+ * @return Token for server, or null if no token available
|
|
* @throws IOException - token selector cannot be instantiated
|
|
* @throws IOException - token selector cannot be instantiated
|
|
*/
|
|
*/
|
|
private Token<?> getServerToken(SaslAuth authType) throws IOException {
|
|
private Token<?> getServerToken(SaslAuth authType) throws IOException {
|
|
TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
|
|
TokenInfo tokenInfo = SecurityUtil.getTokenInfo(protocol, conf);
|
|
- LOG.debug("Get token info proto:"+protocol+" info:"+tokenInfo);
|
|
|
|
|
|
+ LOG.debug("Get token info proto:" + protocol + " info:" + tokenInfo);
|
|
if (tokenInfo == null) { // protocol has no support for tokens
|
|
if (tokenInfo == null) { // protocol has no support for tokens
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
TokenSelector<?> tokenSelector = null;
|
|
TokenSelector<?> tokenSelector = null;
|
|
try {
|
|
try {
|
|
tokenSelector = tokenInfo.value().newInstance();
|
|
tokenSelector = tokenInfo.value().newInstance();
|
|
- } catch (InstantiationException e) {
|
|
|
|
- throw new IOException(e.toString());
|
|
|
|
- } catch (IllegalAccessException e) {
|
|
|
|
- throw new IOException(e.toString());
|
|
|
|
|
|
+ } catch (InstantiationException | IllegalAccessException e) {
|
|
|
|
+ throw new IOException(e.toString(), e);
|
|
}
|
|
}
|
|
return tokenSelector.selectToken(
|
|
return tokenSelector.selectToken(
|
|
SecurityUtil.buildTokenService(serverAddr), ugi.getTokens());
|
|
SecurityUtil.buildTokenService(serverAddr), ugi.getTokens());
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Get the remote server's principal. The value will be obtained from
|
|
* Get the remote server's principal. The value will be obtained from
|
|
* the config and cross-checked against the server's advertised principal.
|
|
* the config and cross-checked against the server's advertised principal.
|
|
@@ -288,7 +290,7 @@ public class SaslRpcClient {
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
String getServerPrincipal(SaslAuth authType) throws IOException {
|
|
String getServerPrincipal(SaslAuth authType) throws IOException {
|
|
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
|
|
KerberosInfo krbInfo = SecurityUtil.getKerberosInfo(protocol, conf);
|
|
- LOG.debug("Get kerberos info proto:"+protocol+" info:"+krbInfo);
|
|
|
|
|
|
+ LOG.debug("Get kerberos info proto:" + protocol + " info:" + krbInfo);
|
|
if (krbInfo == null) { // protocol has no support for kerberos
|
|
if (krbInfo == null) { // protocol has no support for kerberos
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -336,7 +338,6 @@ public class SaslRpcClient {
|
|
}
|
|
}
|
|
return serverPrincipal;
|
|
return serverPrincipal;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* Do client side SASL authentication with server via the given InputStream
|
|
* Do client side SASL authentication with server via the given InputStream
|
|
@@ -358,9 +359,9 @@ public class SaslRpcClient {
|
|
// redefined if/when a SASL negotiation starts, can be queried if the
|
|
// redefined if/when a SASL negotiation starts, can be queried if the
|
|
// negotiation fails
|
|
// negotiation fails
|
|
authMethod = AuthMethod.SIMPLE;
|
|
authMethod = AuthMethod.SIMPLE;
|
|
-
|
|
|
|
|
|
+
|
|
sendSaslMessage(outStream, negotiateRequest);
|
|
sendSaslMessage(outStream, negotiateRequest);
|
|
-
|
|
|
|
|
|
+
|
|
// loop until sasl is complete or a rpc error occurs
|
|
// loop until sasl is complete or a rpc error occurs
|
|
boolean done = false;
|
|
boolean done = false;
|
|
do {
|
|
do {
|
|
@@ -447,7 +448,7 @@ public class SaslRpcClient {
|
|
} while (!done);
|
|
} while (!done);
|
|
return authMethod;
|
|
return authMethod;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private void sendSaslMessage(DataOutputStream out, RpcSaslProto message)
|
|
private void sendSaslMessage(DataOutputStream out, RpcSaslProto message)
|
|
throws IOException {
|
|
throws IOException {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -459,7 +460,7 @@ public class SaslRpcClient {
|
|
request.write(out);
|
|
request.write(out);
|
|
out.flush();
|
|
out.flush();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Evaluate the server provided challenge. The server must send a token
|
|
* Evaluate the server provided challenge. The server must send a token
|
|
* if it's not done. If the server is done, the challenge token is
|
|
* if it's not done. If the server is done, the challenge token is
|
|
@@ -494,7 +495,7 @@ public class SaslRpcClient {
|
|
}
|
|
}
|
|
return saslToken;
|
|
return saslToken;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private RpcSaslProto.Builder createSaslReply(SaslState state,
|
|
private RpcSaslProto.Builder createSaslReply(SaslState state,
|
|
byte[] responseToken) {
|
|
byte[] responseToken) {
|
|
RpcSaslProto.Builder response = RpcSaslProto.newBuilder();
|
|
RpcSaslProto.Builder response = RpcSaslProto.newBuilder();
|
|
@@ -510,9 +511,9 @@ public class SaslRpcClient {
|
|
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
|
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
|
// SASL wrapping is only used if the connection has a QOP, and
|
|
// SASL wrapping is only used if the connection has a QOP, and
|
|
// the value is not auth. ex. auth-int & auth-priv
|
|
// the value is not auth. ex. auth-int & auth-priv
|
|
- return qop != null && !"auth".equalsIgnoreCase(qop);
|
|
|
|
|
|
+ return qop != null && !"auth".toLowerCase(Locale.ENGLISH).equals(qop);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Get SASL wrapped InputStream if SASL QoP requires unwrapping,
|
|
* Get SASL wrapped InputStream if SASL QoP requires unwrapping,
|
|
* otherwise return original stream. Can be called only after
|
|
* otherwise return original stream. Can be called only after
|
|
@@ -534,8 +535,8 @@ public class SaslRpcClient {
|
|
* otherwise return original stream. Can be called only after
|
|
* otherwise return original stream. Can be called only after
|
|
* saslConnect() has been called.
|
|
* saslConnect() has been called.
|
|
*
|
|
*
|
|
- * @param in - InputStream used to make the connection
|
|
|
|
- * @return InputStream that may be using SASL unwrap
|
|
|
|
|
|
+ * @param out - OutputStream used to make the connection
|
|
|
|
+ * @return OutputStream that may be using wrapping
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public OutputStream getOutputStream(OutputStream out) throws IOException {
|
|
public OutputStream getOutputStream(OutputStream out) throws IOException {
|
|
@@ -556,14 +557,14 @@ public class SaslRpcClient {
|
|
public WrappedInputStream(InputStream in) throws IOException {
|
|
public WrappedInputStream(InputStream in) throws IOException {
|
|
super(in);
|
|
super(in);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public int read() throws IOException {
|
|
public int read() throws IOException {
|
|
byte[] b = new byte[1];
|
|
byte[] b = new byte[1];
|
|
int n = read(b, 0, 1);
|
|
int n = read(b, 0, 1);
|
|
return (n != -1) ? b[0] : -1;
|
|
return (n != -1) ? b[0] : -1;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public int read(byte b[]) throws IOException {
|
|
public int read(byte b[]) throws IOException {
|
|
return read(b, 0, b.length);
|
|
return read(b, 0, b.length);
|
|
@@ -580,7 +581,7 @@ public class SaslRpcClient {
|
|
unwrappedRpcBuffer.get(buf, off, readLen);
|
|
unwrappedRpcBuffer.get(buf, off, readLen);
|
|
return readLen;
|
|
return readLen;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
// all messages must be RPC SASL wrapped, else an exception is thrown
|
|
// all messages must be RPC SASL wrapped, else an exception is thrown
|
|
private void readNextRpcPacket() throws IOException {
|
|
private void readNextRpcPacket() throws IOException {
|
|
LOG.debug("reading next wrapped RPC packet");
|
|
LOG.debug("reading next wrapped RPC packet");
|
|
@@ -588,13 +589,13 @@ public class SaslRpcClient {
|
|
int rpcLen = dis.readInt();
|
|
int rpcLen = dis.readInt();
|
|
byte[] rpcBuf = new byte[rpcLen];
|
|
byte[] rpcBuf = new byte[rpcLen];
|
|
dis.readFully(rpcBuf);
|
|
dis.readFully(rpcBuf);
|
|
-
|
|
|
|
|
|
+
|
|
// decode the RPC header
|
|
// decode the RPC header
|
|
ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
|
|
ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf);
|
|
RpcResponseHeaderProto.Builder headerBuilder =
|
|
RpcResponseHeaderProto.Builder headerBuilder =
|
|
RpcResponseHeaderProto.newBuilder();
|
|
RpcResponseHeaderProto.newBuilder();
|
|
headerBuilder.mergeDelimitedFrom(bis);
|
|
headerBuilder.mergeDelimitedFrom(bis);
|
|
-
|
|
|
|
|
|
+
|
|
boolean isWrapped = false;
|
|
boolean isWrapped = false;
|
|
// Must be SASL wrapped, verify and decode.
|
|
// Must be SASL wrapped, verify and decode.
|
|
if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
|
|
if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) {
|
|
@@ -637,7 +638,7 @@ public class SaslRpcClient {
|
|
request.write(dob);
|
|
request.write(dob);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/** Release resources used by wrapped saslClient */
|
|
/** Release resources used by wrapped saslClient */
|
|
public void dispose() throws SaslException {
|
|
public void dispose() throws SaslException {
|
|
if (saslClient != null) {
|
|
if (saslClient != null) {
|