|
@@ -99,6 +99,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslAuth;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslAuth;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslState;
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslState;
|
|
|
|
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RPCTraceInfoProto;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
import org.apache.hadoop.security.SaslPropertiesResolver;
|
|
import org.apache.hadoop.security.SaslPropertiesResolver;
|
|
@@ -118,10 +119,11 @@ import org.apache.hadoop.util.ExitUtil;
|
|
import org.apache.hadoop.util.ProtoUtil;
|
|
import org.apache.hadoop.util.ProtoUtil;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
-import org.apache.htrace.core.SpanId;
|
|
|
|
-import org.apache.htrace.core.TraceScope;
|
|
|
|
-import org.apache.htrace.core.Tracer;
|
|
|
|
-
|
|
|
|
|
|
+import org.apache.hadoop.tracing.Span;
|
|
|
|
+import org.apache.hadoop.tracing.SpanContext;
|
|
|
|
+import org.apache.hadoop.tracing.TraceScope;
|
|
|
|
+import org.apache.hadoop.tracing.Tracer;
|
|
|
|
+import org.apache.hadoop.tracing.TraceUtils;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
|
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
|
@@ -783,7 +785,7 @@ public abstract class Server {
|
|
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
|
private AtomicInteger responseWaitCount = new AtomicInteger(1);
|
|
final RPC.RpcKind rpcKind;
|
|
final RPC.RpcKind rpcKind;
|
|
final byte[] clientId;
|
|
final byte[] clientId;
|
|
- private final TraceScope traceScope; // the HTrace scope on the server side
|
|
|
|
|
|
+ private final Span span; // the trace span on the server side
|
|
private final CallerContext callerContext; // the call context
|
|
private final CallerContext callerContext; // the call context
|
|
private boolean deferredResponse = false;
|
|
private boolean deferredResponse = false;
|
|
private int priorityLevel;
|
|
private int priorityLevel;
|
|
@@ -798,7 +800,7 @@ public abstract class Server {
|
|
|
|
|
|
Call(Call call) {
|
|
Call(Call call) {
|
|
this(call.callId, call.retryCount, call.rpcKind, call.clientId,
|
|
this(call.callId, call.retryCount, call.rpcKind, call.clientId,
|
|
- call.traceScope, call.callerContext);
|
|
|
|
|
|
+ call.span, call.callerContext);
|
|
}
|
|
}
|
|
|
|
|
|
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) {
|
|
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId) {
|
|
@@ -812,14 +814,14 @@ public abstract class Server {
|
|
}
|
|
}
|
|
|
|
|
|
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
|
|
Call(int id, int retryCount, RPC.RpcKind kind, byte[] clientId,
|
|
- TraceScope traceScope, CallerContext callerContext) {
|
|
|
|
|
|
+ Span span, CallerContext callerContext) {
|
|
this.callId = id;
|
|
this.callId = id;
|
|
this.retryCount = retryCount;
|
|
this.retryCount = retryCount;
|
|
this.timestampNanos = Time.monotonicNowNanos();
|
|
this.timestampNanos = Time.monotonicNowNanos();
|
|
this.responseTimestampNanos = timestampNanos;
|
|
this.responseTimestampNanos = timestampNanos;
|
|
this.rpcKind = kind;
|
|
this.rpcKind = kind;
|
|
this.clientId = clientId;
|
|
this.clientId = clientId;
|
|
- this.traceScope = traceScope;
|
|
|
|
|
|
+ this.span = span;
|
|
this.callerContext = callerContext;
|
|
this.callerContext = callerContext;
|
|
this.clientStateId = Long.MIN_VALUE;
|
|
this.clientStateId = Long.MIN_VALUE;
|
|
this.isCallCoordinated = false;
|
|
this.isCallCoordinated = false;
|
|
@@ -988,8 +990,8 @@ public abstract class Server {
|
|
|
|
|
|
RpcCall(Connection connection, int id, int retryCount,
|
|
RpcCall(Connection connection, int id, int retryCount,
|
|
Writable param, RPC.RpcKind kind, byte[] clientId,
|
|
Writable param, RPC.RpcKind kind, byte[] clientId,
|
|
- TraceScope traceScope, CallerContext context) {
|
|
|
|
- super(id, retryCount, kind, clientId, traceScope, context);
|
|
|
|
|
|
+ Span span, CallerContext context) {
|
|
|
|
+ super(id, retryCount, kind, clientId, span, context);
|
|
this.connection = connection;
|
|
this.connection = connection;
|
|
this.rpcRequest = param;
|
|
this.rpcRequest = param;
|
|
}
|
|
}
|
|
@@ -2672,19 +2674,24 @@ public abstract class Server {
|
|
throw new FatalRpcServerException(
|
|
throw new FatalRpcServerException(
|
|
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
|
|
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
|
|
}
|
|
}
|
|
-
|
|
|
|
- TraceScope traceScope = null;
|
|
|
|
|
|
+
|
|
|
|
+ Span span = null;
|
|
if (header.hasTraceInfo()) {
|
|
if (header.hasTraceInfo()) {
|
|
- if (tracer != null) {
|
|
|
|
- // If the incoming RPC included tracing info, always continue the
|
|
|
|
- // trace
|
|
|
|
- SpanId parentSpanId = new SpanId(
|
|
|
|
- header.getTraceInfo().getTraceId(),
|
|
|
|
- header.getTraceInfo().getParentId());
|
|
|
|
- traceScope = tracer.newScope(
|
|
|
|
- RpcClientUtil.toTraceName(rpcRequest.toString()),
|
|
|
|
- parentSpanId);
|
|
|
|
- traceScope.detach();
|
|
|
|
|
|
+ RPCTraceInfoProto traceInfoProto = header.getTraceInfo();
|
|
|
|
+ if (traceInfoProto.hasSpanContext()) {
|
|
|
|
+ if (tracer == null) {
|
|
|
|
+ setTracer(Tracer.curThreadTracer());
|
|
|
|
+ }
|
|
|
|
+ if (tracer != null) {
|
|
|
|
+ // If the incoming RPC included tracing info, always continue the
|
|
|
|
+ // trace
|
|
|
|
+ SpanContext spanCtx = TraceUtils.byteStringToSpanContext(
|
|
|
|
+ traceInfoProto.getSpanContext());
|
|
|
|
+ if (spanCtx != null) {
|
|
|
|
+ span = tracer.newSpan(
|
|
|
|
+ RpcClientUtil.toTraceName(rpcRequest.toString()), spanCtx);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2700,7 +2707,7 @@ public abstract class Server {
|
|
RpcCall call = new RpcCall(this, header.getCallId(),
|
|
RpcCall call = new RpcCall(this, header.getCallId(),
|
|
header.getRetryCount(), rpcRequest,
|
|
header.getRetryCount(), rpcRequest,
|
|
ProtoUtil.convert(header.getRpcKind()),
|
|
ProtoUtil.convert(header.getRpcKind()),
|
|
- header.getClientId().toByteArray(), traceScope, callerContext);
|
|
|
|
|
|
+ header.getClientId().toByteArray(), span, callerContext);
|
|
|
|
|
|
// Save the priority level assignment by the scheduler
|
|
// Save the priority level assignment by the scheduler
|
|
call.setPriorityLevel(callQueue.getPriorityLevel(call));
|
|
call.setPriorityLevel(callQueue.getPriorityLevel(call));
|
|
@@ -2953,10 +2960,9 @@ public abstract class Server {
|
|
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
|
|
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
|
|
}
|
|
}
|
|
CurCall.set(call);
|
|
CurCall.set(call);
|
|
- if (call.traceScope != null) {
|
|
|
|
- call.traceScope.reattach();
|
|
|
|
- traceScope = call.traceScope;
|
|
|
|
- traceScope.getSpan().addTimelineAnnotation("called");
|
|
|
|
|
|
+ if (call.span != null) {
|
|
|
|
+ traceScope = tracer.activateSpan(call.span);
|
|
|
|
+ call.span.addTimelineAnnotation("called");
|
|
}
|
|
}
|
|
// always update the current call context
|
|
// always update the current call context
|
|
CallerContext.setCurrent(call.callerContext);
|
|
CallerContext.setCurrent(call.callerContext);
|
|
@@ -2971,14 +2977,14 @@ public abstract class Server {
|
|
if (running) { // unexpected -- log it
|
|
if (running) { // unexpected -- log it
|
|
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
|
|
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
|
|
if (traceScope != null) {
|
|
if (traceScope != null) {
|
|
- traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " +
|
|
|
|
|
|
+ traceScope.addTimelineAnnotation("unexpectedly interrupted: " +
|
|
StringUtils.stringifyException(e));
|
|
StringUtils.stringifyException(e));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.info(Thread.currentThread().getName() + " caught an exception", e);
|
|
LOG.info(Thread.currentThread().getName() + " caught an exception", e);
|
|
if (traceScope != null) {
|
|
if (traceScope != null) {
|
|
- traceScope.getSpan().addTimelineAnnotation("Exception: " +
|
|
|
|
|
|
+ traceScope.addTimelineAnnotation("Exception: " +
|
|
StringUtils.stringifyException(e));
|
|
StringUtils.stringifyException(e));
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|