|
@@ -18,6 +18,11 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
+import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
|
|
|
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
|
|
+import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
|
|
|
+import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
|
|
+
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
@@ -75,8 +80,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
-import static org.apache.hadoop.ipc.RpcConstants.*;
|
|
|
-
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
|
|
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
|
@@ -467,17 +470,24 @@ public abstract class Server {
|
|
|
return serviceAuthorizationManager;
|
|
|
}
|
|
|
|
|
|
+ static Class<? extends BlockingQueue<Call>> getQueueClass(
|
|
|
+ String prefix, Configuration conf) {
|
|
|
+ String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
|
|
|
+ Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
|
|
|
+ return CallQueueManager.convertQueueClass(queueClass, Call.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getQueueClassPrefix() {
|
|
|
+ return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Refresh the call queue
|
|
|
*/
|
|
|
public synchronized void refreshCallQueue(Configuration conf) {
|
|
|
// Create the next queue
|
|
|
- String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
|
|
|
- this.port;
|
|
|
- Class queueClassToUse = conf.getClass(prefix + "." +
|
|
|
- CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
|
|
|
-
|
|
|
- callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf);
|
|
|
+ String prefix = getQueueClassPrefix();
|
|
|
+ callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
|
|
|
}
|
|
|
|
|
|
/** A call queued for handling. */
|
|
@@ -1225,9 +1235,9 @@ public abstract class Server {
|
|
|
Throwable cause = e;
|
|
|
while (cause != null) {
|
|
|
if (cause instanceof RetriableException) {
|
|
|
- return (RetriableException) cause;
|
|
|
+ return cause;
|
|
|
} else if (cause instanceof StandbyException) {
|
|
|
- return (StandbyException) cause;
|
|
|
+ return cause;
|
|
|
} else if (cause instanceof InvalidToken) {
|
|
|
// FIXME: hadoop method signatures are restricting the SASL
|
|
|
// callbacks to only returning InvalidToken, but some services
|
|
@@ -1297,7 +1307,7 @@ public abstract class Server {
|
|
|
|
|
|
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
|
|
|
throws IOException, InterruptedException {
|
|
|
- RpcSaslProto saslResponse = null;
|
|
|
+ final RpcSaslProto saslResponse;
|
|
|
final SaslState state = saslMessage.getState(); // required
|
|
|
switch (state) {
|
|
|
case NEGOTIATE: {
|
|
@@ -1333,27 +1343,18 @@ public abstract class Server {
|
|
|
// SIMPLE is a legit option above. we will send no response
|
|
|
if (authMethod == AuthMethod.SIMPLE) {
|
|
|
switchToSimple();
|
|
|
+ saslResponse = null;
|
|
|
break;
|
|
|
}
|
|
|
// sasl server for tokens may already be instantiated
|
|
|
if (saslServer == null || authMethod != AuthMethod.TOKEN) {
|
|
|
saslServer = createSaslServer(authMethod);
|
|
|
}
|
|
|
- // fallthru to process sasl token
|
|
|
+ saslResponse = processSaslToken(saslMessage);
|
|
|
+ break;
|
|
|
}
|
|
|
case RESPONSE: {
|
|
|
- if (!saslMessage.hasToken()) {
|
|
|
- throw new SaslException("Client did not send a token");
|
|
|
- }
|
|
|
- byte[] saslToken = saslMessage.getToken().toByteArray();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Have read input token of size " + saslToken.length
|
|
|
- + " for processing by saslServer.evaluateResponse()");
|
|
|
- }
|
|
|
- saslToken = saslServer.evaluateResponse(saslToken);
|
|
|
- saslResponse = buildSaslResponse(
|
|
|
- saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
|
|
|
- saslToken);
|
|
|
+ saslResponse = processSaslToken(saslMessage);
|
|
|
break;
|
|
|
}
|
|
|
default:
|
|
@@ -1362,6 +1363,22 @@ public abstract class Server {
|
|
|
return saslResponse;
|
|
|
}
|
|
|
|
|
|
+ private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)
|
|
|
+ throws SaslException {
|
|
|
+ if (!saslMessage.hasToken()) {
|
|
|
+ throw new SaslException("Client did not send a token");
|
|
|
+ }
|
|
|
+ byte[] saslToken = saslMessage.getToken().toByteArray();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Have read input token of size " + saslToken.length
|
|
|
+ + " for processing by saslServer.evaluateResponse()");
|
|
|
+ }
|
|
|
+ saslToken = saslServer.evaluateResponse(saslToken);
|
|
|
+ return buildSaslResponse(
|
|
|
+ saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
|
|
|
+ saslToken);
|
|
|
+ }
|
|
|
+
|
|
|
private void switchToSimple() {
|
|
|
// disable SASL and blank out any SASL server
|
|
|
authProtocol = AuthProtocol.NONE;
|
|
@@ -2123,12 +2140,9 @@ public abstract class Server {
|
|
|
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
|
|
|
|
|
|
// Setup appropriate callqueue
|
|
|
- String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
|
|
|
- this.port;
|
|
|
- Class queueClassToUse = conf.getClass(prefix + "." +
|
|
|
- CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
|
|
|
- this.callQueue = new CallQueueManager<Call>(queueClassToUse, maxQueueSize,
|
|
|
- prefix, conf);
|
|
|
+ final String prefix = getQueueClassPrefix();
|
|
|
+ this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
|
|
|
+ maxQueueSize, prefix, conf);
|
|
|
|
|
|
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
|
|
|
this.authorize =
|