|
@@ -383,6 +383,11 @@ public abstract class Server {
|
|
|
return (call != null) ? call.getRemoteUser() : null;
|
|
|
}
|
|
|
|
|
|
+ public static String getProtocol() {
|
|
|
+ Call call = CurCall.get();
|
|
|
+ return (call != null) ? call.getProtocol() : null;
|
|
|
+ }
|
|
|
+
|
|
|
/** Return true if the invocation was through an RPC.
|
|
|
*/
|
|
|
public static boolean isRpcInvocation() {
|
|
@@ -671,6 +676,11 @@ public abstract class Server {
|
|
|
private int priorityLevel;
|
|
|
// the priority level assigned by scheduler, 0 by default
|
|
|
|
|
|
+ Call() {
|
|
|
+ this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
|
|
|
+ RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
|
|
|
+ }
|
|
|
+
|
|
|
Call(Call call) {
|
|
|
this(call.callId, call.retryCount, call.rpcKind, call.clientId,
|
|
|
call.traceScope, call.callerContext);
|
|
@@ -702,6 +712,7 @@ public abstract class Server {
|
|
|
return "Call#" + callId + " Retry#" + retryCount;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public Void run() throws Exception {
|
|
|
return null;
|
|
|
}
|
|
@@ -717,6 +728,10 @@ public abstract class Server {
|
|
|
return (addr != null) ? addr.getHostAddress() : null;
|
|
|
}
|
|
|
|
|
|
+ public String getProtocol() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Allow a IPC response to be postponed instead of sent immediately
|
|
|
* after the handler returns from the proxy method. The intended use
|
|
@@ -798,6 +813,11 @@ public abstract class Server {
|
|
|
this.rpcRequest = param;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public String getProtocol() {
|
|
|
+ return "rpc";
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public UserGroupInformation getRemoteUser() {
|
|
|
return connection.user;
|
|
@@ -2250,33 +2270,15 @@ public abstract class Server {
|
|
|
// Save the priority level assignment by the scheduler
|
|
|
call.setPriorityLevel(callQueue.getPriorityLevel(call));
|
|
|
|
|
|
- if (callQueue.isClientBackoffEnabled()) {
|
|
|
- // if RPC queue is full, we will ask the RPC client to back off by
|
|
|
- // throwing RetriableException. Whether RPC client will honor
|
|
|
- // RetriableException and retry depends on client ipc retry policy.
|
|
|
- // For example, FailoverOnNetworkExceptionRetry handles
|
|
|
- // RetriableException.
|
|
|
- queueRequestOrAskClientToBackOff(call);
|
|
|
- } else {
|
|
|
- callQueue.put(call); // queue the call; maybe blocked here
|
|
|
+ try {
|
|
|
+ queueCall(call);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ throw new WrappedRpcServerException(
|
|
|
+ RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
|
|
|
}
|
|
|
incRpcCount(); // Increment the rpc count
|
|
|
}
|
|
|
|
|
|
- private void queueRequestOrAskClientToBackOff(Call call)
|
|
|
- throws WrappedRpcServerException, InterruptedException {
|
|
|
- // If rpc scheduler indicates back off based on performance
|
|
|
- // degradation such as response time or rpc queue is full,
|
|
|
- // we will ask the client to back off.
|
|
|
- if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
|
|
|
- rpcMetrics.incrClientBackoff();
|
|
|
- RetriableException retriableException =
|
|
|
- new RetriableException("Server is too busy.");
|
|
|
- throw new WrappedRpcServerExceptionSuppressed(
|
|
|
- RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Establish RPC connection setup by negotiating SASL if required, then
|
|
|
* reading and authorizing the connection header
|
|
@@ -2403,6 +2405,21 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void queueCall(Call call) throws IOException, InterruptedException {
|
|
|
+ if (!callQueue.isClientBackoffEnabled()) {
|
|
|
+ callQueue.put(call); // queue the call; maybe blocked here
|
|
|
+ } else if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
|
|
|
+ // If rpc scheduler indicates back off based on performance degradation
|
|
|
+ // such as response time or rpc queue is full, we will ask the client
|
|
|
+ // to back off by throwing RetriableException. Whether the client will
|
|
|
+ // honor RetriableException and retry depends the client and its policy.
|
|
|
+ // For example, IPC clients using FailoverOnNetworkExceptionRetry handle
|
|
|
+ // RetriableException.
|
|
|
+ rpcMetrics.incrClientBackoff();
|
|
|
+ throw new RetriableException("Server is too busy.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Handles queued calls . */
|
|
|
private class Handler extends Thread {
|
|
|
public Handler(int instanceNumber) {
|