|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
|
|
import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
|
|
|
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
|
|
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
|
|
@@ -62,6 +63,7 @@ import java.util.TimerTask;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
@@ -486,7 +488,7 @@ public abstract class Server {
|
|
|
* if and only if it falls above 99.7% of requests. We start this logic
|
|
|
* only once we have enough sample size.
|
|
|
*/
|
|
|
- void logSlowRpcCalls(String methodName, int processingTime) {
|
|
|
+ void logSlowRpcCalls(String methodName, Call call, long processingTime) {
|
|
|
final int deviation = 3;
|
|
|
|
|
|
// 1024 for minSampleSize just a guess -- not a number computed based on
|
|
@@ -499,27 +501,47 @@ public abstract class Server {
|
|
|
|
|
|
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
|
|
|
(processingTime > threeSigma)) {
|
|
|
- if(LOG.isWarnEnabled()) {
|
|
|
- String client = CurCall.get().toString();
|
|
|
- LOG.warn(
|
|
|
- "Slow RPC : " + methodName + " took " + processingTime +
|
|
|
- " milliseconds to process from client " + client);
|
|
|
- }
|
|
|
+ LOG.warn("Slow RPC : {} took {} {} to process from client {}",
|
|
|
+ methodName, processingTime, RpcMetrics.TIMEUNIT, call);
|
|
|
rpcMetrics.incrSlowRpc();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- void updateMetrics(String name, int queueTime, int processingTime,
|
|
|
- boolean deferredCall) {
|
|
|
+ void updateMetrics(Call call, long startTime, boolean connDropped) {
|
|
|
+ // delta = handler + processing + response
|
|
|
+ long deltaNanos = Time.monotonicNowNanos() - startTime;
|
|
|
+ long timestampNanos = call.timestampNanos;
|
|
|
+
|
|
|
+ ProcessingDetails details = call.getProcessingDetails();
|
|
|
+ // queue time is the delta between when the call first arrived and when it
|
|
|
+ // began being serviced, minus the time it took to be put into the queue
|
|
|
+ details.set(Timing.QUEUE,
|
|
|
+ startTime - timestampNanos - details.get(Timing.ENQUEUE));
|
|
|
+ deltaNanos -= details.get(Timing.PROCESSING);
|
|
|
+ deltaNanos -= details.get(Timing.RESPONSE);
|
|
|
+ details.set(Timing.HANDLER, deltaNanos);
|
|
|
+
|
|
|
+ long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
|
|
rpcMetrics.addRpcQueueTime(queueTime);
|
|
|
- if (!deferredCall) {
|
|
|
- rpcMetrics.addRpcProcessingTime(processingTime);
|
|
|
- rpcDetailedMetrics.addProcessingTime(name, processingTime);
|
|
|
- callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
|
|
|
- processingTime);
|
|
|
- if (isLogSlowRPC()) {
|
|
|
- logSlowRpcCalls(name, processingTime);
|
|
|
- }
|
|
|
+
|
|
|
+ if (call.isResponseDeferred() || connDropped) {
|
|
|
+ // call was skipped; don't include it in processing metrics
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long processingTime =
|
|
|
+ details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
|
|
+ long waitTime =
|
|
|
+ details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
|
|
|
+ rpcMetrics.addRpcLockWaitTime(waitTime);
|
|
|
+ rpcMetrics.addRpcProcessingTime(processingTime);
|
|
|
+ // don't include lock wait for detailed metrics.
|
|
|
+ processingTime -= waitTime;
|
|
|
+ String name = call.getDetailedMetricsName();
|
|
|
+ rpcDetailedMetrics.addProcessingTime(name, processingTime);
|
|
|
+ callQueue.addResponseTime(name, call, details);
|
|
|
+ if (isLogSlowRPC()) {
|
|
|
+ logSlowRpcCalls(name, call, processingTime);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -688,9 +710,13 @@ public abstract class Server {
|
|
|
/** A generic call queued for handling. */
|
|
|
public static class Call implements Schedulable,
|
|
|
PrivilegedExceptionAction<Void> {
|
|
|
+ private final ProcessingDetails processingDetails =
|
|
|
+ new ProcessingDetails(TimeUnit.NANOSECONDS);
|
|
|
+ // the method name to use in metrics
|
|
|
+ private volatile String detailedMetricsName = "";
|
|
|
final int callId; // the client's call id
|
|
|
final int retryCount; // the retry count of the call
|
|
|
- long timestamp; // time received when response is null
|
|
|
+ long timestampNanos; // time received when response is null
|
|
|
// time served when response is not null
|
|
|
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
|
|
final RPC.RpcKind rpcKind;
|
|
@@ -727,7 +753,7 @@ public abstract class Server {
|
|
|
TraceScope traceScope, CallerContext callerContext) {
|
|
|
this.callId = id;
|
|
|
this.retryCount = retryCount;
|
|
|
- this.timestamp = Time.now();
|
|
|
+ this.timestampNanos = Time.monotonicNowNanos();
|
|
|
this.rpcKind = kind;
|
|
|
this.clientId = clientId;
|
|
|
this.traceScope = traceScope;
|
|
@@ -736,6 +762,28 @@ public abstract class Server {
|
|
|
this.isCallCoordinated = false;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Indicates whether the call has been processed. Always true unless
|
|
|
+ * overridden.
|
|
|
+ *
|
|
|
+ * @return true
|
|
|
+ */
|
|
|
+ boolean isOpen() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ String getDetailedMetricsName() {
|
|
|
+ return detailedMetricsName;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setDetailedMetricsName(String name) {
|
|
|
+ detailedMetricsName = name;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ProcessingDetails getProcessingDetails() {
|
|
|
+ return processingDetails;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return "Call#" + callId + " Retry#" + retryCount;
|
|
@@ -883,6 +931,11 @@ public abstract class Server {
|
|
|
this.rpcRequest = param;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ boolean isOpen() {
|
|
|
+ return connection.channel.isOpen();
|
|
|
+ }
|
|
|
+
|
|
|
void setResponseFields(Writable returnValue,
|
|
|
ResponseParams responseParams) {
|
|
|
this.rv = returnValue;
|
|
@@ -910,18 +963,33 @@ public abstract class Server {
|
|
|
Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ long startNanos = Time.monotonicNowNanos();
|
|
|
Writable value = null;
|
|
|
ResponseParams responseParams = new ResponseParams();
|
|
|
|
|
|
try {
|
|
|
value = call(
|
|
|
- rpcKind, connection.protocolName, rpcRequest, timestamp);
|
|
|
+ rpcKind, connection.protocolName, rpcRequest, timestampNanos);
|
|
|
} catch (Throwable e) {
|
|
|
populateResponseParamsOnError(e, responseParams);
|
|
|
}
|
|
|
if (!isResponseDeferred()) {
|
|
|
+ long deltaNanos = Time.monotonicNowNanos() - startNanos;
|
|
|
+ ProcessingDetails details = getProcessingDetails();
|
|
|
+
|
|
|
+ details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
|
|
|
+ deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
|
|
|
+ deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
|
|
|
+ deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
|
|
|
+ details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
|
|
|
+ startNanos = Time.monotonicNowNanos();
|
|
|
+
|
|
|
setResponseFields(value, responseParams);
|
|
|
sendResponse();
|
|
|
+
|
|
|
+ deltaNanos = Time.monotonicNowNanos() - startNanos;
|
|
|
+ details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Deferring response for callId: " + this.callId);
|
|
@@ -1341,12 +1409,13 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private final static long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert(
|
|
|
+ 15, TimeUnit.MINUTES);
|
|
|
+
|
|
|
// Sends responses of RPC back to clients.
|
|
|
private class Responder extends Thread {
|
|
|
private final Selector writeSelector;
|
|
|
private int pending; // connections waiting to register
|
|
|
-
|
|
|
- final static int PURGE_INTERVAL = 900000; // 15mins
|
|
|
|
|
|
Responder() throws IOException {
|
|
|
this.setName("IPC Server Responder");
|
|
@@ -1372,12 +1441,13 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
private void doRunLoop() {
|
|
|
- long lastPurgeTime = 0; // last check for old calls.
|
|
|
+ long lastPurgeTimeNanos = 0; // last check for old calls.
|
|
|
|
|
|
while (running) {
|
|
|
try {
|
|
|
waitPending(); // If a channel is being registered, wait.
|
|
|
- writeSelector.select(PURGE_INTERVAL);
|
|
|
+ writeSelector.select(
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
|
|
|
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
|
|
|
while (iter.hasNext()) {
|
|
|
SelectionKey key = iter.next();
|
|
@@ -1399,11 +1469,11 @@ public abstract class Server {
|
|
|
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
|
|
|
}
|
|
|
}
|
|
|
- long now = Time.now();
|
|
|
- if (now < lastPurgeTime + PURGE_INTERVAL) {
|
|
|
+ long nowNanos = Time.monotonicNowNanos();
|
|
|
+ if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
|
|
|
continue;
|
|
|
}
|
|
|
- lastPurgeTime = now;
|
|
|
+ lastPurgeTimeNanos = nowNanos;
|
|
|
//
|
|
|
// If there were some calls that have not been sent out for a
|
|
|
// long time, discard them.
|
|
@@ -1427,7 +1497,7 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
for (RpcCall call : calls) {
|
|
|
- doPurge(call, now);
|
|
|
+ doPurge(call, nowNanos);
|
|
|
}
|
|
|
} catch (OutOfMemoryError e) {
|
|
|
//
|
|
@@ -1478,7 +1548,7 @@ public abstract class Server {
|
|
|
Iterator<RpcCall> iter = responseQueue.listIterator(0);
|
|
|
while (iter.hasNext()) {
|
|
|
call = iter.next();
|
|
|
- if (now > call.timestamp + PURGE_INTERVAL) {
|
|
|
+ if (now > call.timestampNanos + PURGE_INTERVAL_NANOS) {
|
|
|
closeConnection(call.connection);
|
|
|
break;
|
|
|
}
|
|
@@ -1542,7 +1612,7 @@ public abstract class Server {
|
|
|
|
|
|
if (inHandler) {
|
|
|
// set the serve time when the response has to be sent later
|
|
|
- call.timestamp = Time.now();
|
|
|
+ call.timestampNanos = Time.monotonicNowNanos();
|
|
|
|
|
|
incPending();
|
|
|
try {
|
|
@@ -2649,6 +2719,9 @@ public abstract class Server {
|
|
|
} else {
|
|
|
callQueue.add(call);
|
|
|
}
|
|
|
+ long deltaNanos = Time.monotonicNowNanos() - call.timestampNanos;
|
|
|
+ call.getProcessingDetails().set(Timing.ENQUEUE, deltaNanos,
|
|
|
+ TimeUnit.NANOSECONDS);
|
|
|
} catch (CallQueueOverflowException cqe) {
|
|
|
// If rpc scheduler indicates back off based on performance degradation
|
|
|
// such as response time or rpc queue is full, we will ask the client
|
|
@@ -2675,8 +2748,16 @@ public abstract class Server {
|
|
|
SERVER.set(Server.this);
|
|
|
while (running) {
|
|
|
TraceScope traceScope = null;
|
|
|
+ Call call = null;
|
|
|
+ long startTimeNanos = 0;
|
|
|
+ // True iff the connection for this call has been dropped.
|
|
|
+ // Set to true by default and update to false later if the connection
|
|
|
+ // can be succesfully read.
|
|
|
+ boolean connDropped = true;
|
|
|
+
|
|
|
try {
|
|
|
- final Call call = callQueue.take(); // pop the queue; maybe blocked here
|
|
|
+ call = callQueue.take(); // pop the queue; maybe blocked here
|
|
|
+ startTimeNanos = Time.monotonicNowNanos();
|
|
|
if (alignmentContext != null && call.isCallCoordinated() &&
|
|
|
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
|
|
|
/*
|
|
@@ -2707,6 +2788,7 @@ public abstract class Server {
|
|
|
// always update the current call context
|
|
|
CallerContext.setCurrent(call.callerContext);
|
|
|
UserGroupInformation remoteUser = call.getRemoteUser();
|
|
|
+ connDropped = !call.isOpen();
|
|
|
if (remoteUser != null) {
|
|
|
remoteUser.doAs(call);
|
|
|
} else {
|
|
@@ -2729,6 +2811,14 @@ public abstract class Server {
|
|
|
} finally {
|
|
|
CurCall.set(null);
|
|
|
IOUtils.cleanupWithLogger(LOG, traceScope);
|
|
|
+ if (call != null) {
|
|
|
+ updateMetrics(call, startTimeNanos, connDropped);
|
|
|
+ ProcessingDetails.LOG.debug(
|
|
|
+ "Served: [{}]{} name={} user={} details={}",
|
|
|
+ call, (call.isResponseDeferred() ? ", deferred" : ""),
|
|
|
+ call.getDetailedMetricsName(), call.getRemoteUser(),
|
|
|
+ call.getProcessingDetails());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
LOG.debug(Thread.currentThread().getName() + ": exiting");
|