|
@@ -26,7 +26,6 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
-import java.io.DataInputStream;
|
|
|
|
import java.io.DataOutputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.lang.reflect.UndeclaredThrowableException;
|
|
import java.lang.reflect.UndeclaredThrowableException;
|
|
@@ -47,6 +46,7 @@ import java.nio.channels.Selector;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.ServerSocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.nio.channels.WritableByteChannel;
|
|
import java.nio.channels.WritableByteChannel;
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
@@ -69,7 +69,6 @@ import javax.security.sasl.Sasl;
|
|
import javax.security.sasl.SaslException;
|
|
import javax.security.sasl.SaslException;
|
|
import javax.security.sasl.SaslServer;
|
|
import javax.security.sasl.SaslServer;
|
|
|
|
|
|
-import org.apache.commons.io.Charsets;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -80,12 +79,9 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration.IntegerRanges;
|
|
import org.apache.hadoop.conf.Configuration.IntegerRanges;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
-import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
|
|
|
|
-import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
|
|
|
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
|
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
|
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
|
@@ -115,7 +111,6 @@ import org.apache.hadoop.security.token.SecretManager;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.util.ProtoUtil;
|
|
import org.apache.hadoop.util.ProtoUtil;
|
|
-import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.htrace.core.SpanId;
|
|
import org.apache.htrace.core.SpanId;
|
|
@@ -124,9 +119,7 @@ import org.apache.htrace.core.Tracer;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.protobuf.ByteString;
|
|
import com.google.protobuf.ByteString;
|
|
-import com.google.protobuf.CodedOutputStream;
|
|
|
|
import com.google.protobuf.Message;
|
|
import com.google.protobuf.Message;
|
|
-import com.google.protobuf.Message.Builder;
|
|
|
|
|
|
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
@@ -223,7 +216,7 @@ public abstract class Server {
|
|
* and send back a nicer response.
|
|
* and send back a nicer response.
|
|
*/
|
|
*/
|
|
private static final ByteBuffer HTTP_GET_BYTES = ByteBuffer.wrap(
|
|
private static final ByteBuffer HTTP_GET_BYTES = ByteBuffer.wrap(
|
|
- "GET ".getBytes(Charsets.UTF_8));
|
|
|
|
|
|
+ "GET ".getBytes(StandardCharsets.UTF_8));
|
|
|
|
|
|
/**
|
|
/**
|
|
* An HTTP response to send back if we detect an HTTP request to our IPC
|
|
* An HTTP response to send back if we detect an HTTP request to our IPC
|
|
@@ -423,6 +416,13 @@ public abstract class Server {
|
|
|
|
|
|
private int maxQueueSize;
|
|
private int maxQueueSize;
|
|
private final int maxRespSize;
|
|
private final int maxRespSize;
|
|
|
|
+ private final ThreadLocal<ResponseBuffer> responseBuffer =
|
|
|
|
+ new ThreadLocal<ResponseBuffer>(){
|
|
|
|
+ @Override
|
|
|
|
+ protected ResponseBuffer initialValue() {
|
|
|
|
+ return new ResponseBuffer(INITIAL_RESP_BUF_SIZE);
|
|
|
|
+ }
|
|
|
|
+ };
|
|
private int socketSendBufferSize;
|
|
private int socketSendBufferSize;
|
|
private final int maxDataLength;
|
|
private final int maxDataLength;
|
|
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
@@ -744,14 +744,7 @@ public abstract class Server {
|
|
public void abortResponse(Throwable t) throws IOException {
|
|
public void abortResponse(Throwable t) throws IOException {
|
|
// don't send response if the call was already sent or aborted.
|
|
// don't send response if the call was already sent or aborted.
|
|
if (responseWaitCount.getAndSet(-1) > 0) {
|
|
if (responseWaitCount.getAndSet(-1) > 0) {
|
|
- // clone the call to prevent a race with the other thread stomping
|
|
|
|
- // on the response while being sent. the original call is
|
|
|
|
- // effectively discarded since the wait count won't hit zero
|
|
|
|
- Call call = new Call(this);
|
|
|
|
- setupResponse(new ByteArrayOutputStream(), call,
|
|
|
|
- RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
|
|
|
|
- null, t.getClass().getName(), StringUtils.stringifyException(t));
|
|
|
|
- call.sendResponse();
|
|
|
|
|
|
+ connection.abortResponse(this, t);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1272,9 +1265,7 @@ public abstract class Server {
|
|
// must only wrap before adding to the responseQueue to prevent
|
|
// must only wrap before adding to the responseQueue to prevent
|
|
// postponed responses from being encrypted and sent out of order.
|
|
// postponed responses from being encrypted and sent out of order.
|
|
if (call.connection.useWrap) {
|
|
if (call.connection.useWrap) {
|
|
- ByteArrayOutputStream response = new ByteArrayOutputStream();
|
|
|
|
- wrapWithSasl(response, call);
|
|
|
|
- call.setResponse(ByteBuffer.wrap(response.toByteArray()));
|
|
|
|
|
|
+ wrapWithSasl(call);
|
|
}
|
|
}
|
|
call.connection.responseQueue.addLast(call);
|
|
call.connection.responseQueue.addLast(call);
|
|
if (call.connection.responseQueue.size() == 1) {
|
|
if (call.connection.responseQueue.size() == 1) {
|
|
@@ -1349,6 +1340,7 @@ public abstract class Server {
|
|
* A WrappedRpcServerException that is suppressed altogether
|
|
* A WrappedRpcServerException that is suppressed altogether
|
|
* for the purposes of logging.
|
|
* for the purposes of logging.
|
|
*/
|
|
*/
|
|
|
|
+ @SuppressWarnings("serial")
|
|
private static class WrappedRpcServerExceptionSuppressed
|
|
private static class WrappedRpcServerExceptionSuppressed
|
|
extends WrappedRpcServerException {
|
|
extends WrappedRpcServerException {
|
|
public WrappedRpcServerExceptionSuppressed(
|
|
public WrappedRpcServerExceptionSuppressed(
|
|
@@ -1395,8 +1387,7 @@ public abstract class Server {
|
|
// Fake 'call' for failed authorization response
|
|
// Fake 'call' for failed authorization response
|
|
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
|
|
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,
|
|
RpcConstants.INVALID_RETRY_COUNT, null, this);
|
|
RpcConstants.INVALID_RETRY_COUNT, null, this);
|
|
- private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
|
|
|
|
-
|
|
|
|
|
|
+
|
|
private boolean sentNegotiate = false;
|
|
private boolean sentNegotiate = false;
|
|
private boolean useWrap = false;
|
|
private boolean useWrap = false;
|
|
|
|
|
|
@@ -1482,10 +1473,10 @@ public abstract class Server {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void saslReadAndProcess(DataInputStream dis) throws
|
|
|
|
|
|
+ private void saslReadAndProcess(RpcWritable.Buffer buffer) throws
|
|
WrappedRpcServerException, IOException, InterruptedException {
|
|
WrappedRpcServerException, IOException, InterruptedException {
|
|
final RpcSaslProto saslMessage =
|
|
final RpcSaslProto saslMessage =
|
|
- decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);
|
|
|
|
|
|
+ getMessage(RpcSaslProto.getDefaultInstance(), buffer);
|
|
switch (saslMessage.getState()) {
|
|
switch (saslMessage.getState()) {
|
|
case WRAP: {
|
|
case WRAP: {
|
|
if (!saslContextEstablished || !useWrap) {
|
|
if (!saslContextEstablished || !useWrap) {
|
|
@@ -1598,7 +1589,10 @@ public abstract class Server {
|
|
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
|
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
|
// SASL wrapping is only used if the connection has a QOP, and
|
|
// SASL wrapping is only used if the connection has a QOP, and
|
|
// the value is not auth. ex. auth-int & auth-priv
|
|
// the value is not auth. ex. auth-int & auth-priv
|
|
- useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
|
|
|
|
|
|
+ useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
|
|
|
|
+ if (!useWrap) {
|
|
|
|
+ disposeSasl();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1692,9 +1686,9 @@ public abstract class Server {
|
|
private void switchToSimple() {
|
|
private void switchToSimple() {
|
|
// disable SASL and blank out any SASL server
|
|
// disable SASL and blank out any SASL server
|
|
authProtocol = AuthProtocol.NONE;
|
|
authProtocol = AuthProtocol.NONE;
|
|
- saslServer = null;
|
|
|
|
|
|
+ disposeSasl();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
|
|
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Will send " + state + " token of size "
|
|
LOG.debug("Will send " + state + " token of size "
|
|
@@ -1712,15 +1706,14 @@ public abstract class Server {
|
|
private void doSaslReply(Message message) throws IOException {
|
|
private void doSaslReply(Message message) throws IOException {
|
|
final Call saslCall = new Call(AuthProtocol.SASL.callId,
|
|
final Call saslCall = new Call(AuthProtocol.SASL.callId,
|
|
RpcConstants.INVALID_RETRY_COUNT, null, this);
|
|
RpcConstants.INVALID_RETRY_COUNT, null, this);
|
|
- final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
|
|
|
|
- setupResponse(saslResponse, saslCall,
|
|
|
|
|
|
+ setupResponse(saslCall,
|
|
RpcStatusProto.SUCCESS, null,
|
|
RpcStatusProto.SUCCESS, null,
|
|
- new RpcResponseWrapper(message), null, null);
|
|
|
|
|
|
+ RpcWritable.wrap(message), null, null);
|
|
saslCall.sendResponse();
|
|
saslCall.sendResponse();
|
|
}
|
|
}
|
|
|
|
|
|
private void doSaslReply(Exception ioe) throws IOException {
|
|
private void doSaslReply(Exception ioe) throws IOException {
|
|
- setupResponse(authFailedResponse, authFailedCall,
|
|
|
|
|
|
+ setupResponse(authFailedCall,
|
|
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
|
|
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
|
|
null, ioe.getClass().getName(), ioe.getLocalizedMessage());
|
|
null, ioe.getClass().getName(), ioe.getLocalizedMessage());
|
|
authFailedCall.sendResponse();
|
|
authFailedCall.sendResponse();
|
|
@@ -1731,6 +1724,8 @@ public abstract class Server {
|
|
try {
|
|
try {
|
|
saslServer.dispose();
|
|
saslServer.dispose();
|
|
} catch (SaslException ignored) {
|
|
} catch (SaslException ignored) {
|
|
|
|
+ } finally {
|
|
|
|
+ saslServer = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1839,7 +1834,7 @@ public abstract class Server {
|
|
dataLengthBuffer.clear(); // to read length of future rpc packets
|
|
dataLengthBuffer.clear(); // to read length of future rpc packets
|
|
data.flip();
|
|
data.flip();
|
|
boolean isHeaderRead = connectionContextRead;
|
|
boolean isHeaderRead = connectionContextRead;
|
|
- processOneRpc(data.array());
|
|
|
|
|
|
+ processOneRpc(data);
|
|
data = null;
|
|
data = null;
|
|
// the last rpc-request we processed could have simply been the
|
|
// the last rpc-request we processed could have simply been the
|
|
// connectionContext; if so continue to read the first RPC.
|
|
// connectionContext; if so continue to read the first RPC.
|
|
@@ -1929,7 +1924,7 @@ public abstract class Server {
|
|
// Versions >>9 understand the normal response
|
|
// Versions >>9 understand the normal response
|
|
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
|
|
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
|
|
this);
|
|
this);
|
|
- setupResponse(buffer, fakeCall,
|
|
|
|
|
|
+ setupResponse(fakeCall,
|
|
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
|
|
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
|
|
null, VersionMismatch.class.getName(), errMsg);
|
|
null, VersionMismatch.class.getName(), errMsg);
|
|
fakeCall.sendResponse();
|
|
fakeCall.sendResponse();
|
|
@@ -1957,7 +1952,7 @@ public abstract class Server {
|
|
private void setupHttpRequestOnIpcPortResponse() throws IOException {
|
|
private void setupHttpRequestOnIpcPortResponse() throws IOException {
|
|
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
|
|
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
|
|
fakeCall.setResponse(ByteBuffer.wrap(
|
|
fakeCall.setResponse(ByteBuffer.wrap(
|
|
- RECEIVED_HTTP_REQ_RESPONSE.getBytes(Charsets.UTF_8)));
|
|
|
|
|
|
+ RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
|
|
fakeCall.sendResponse();
|
|
fakeCall.sendResponse();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1966,7 +1961,7 @@ public abstract class Server {
|
|
* @throws WrappedRpcServerException - if the header cannot be
|
|
* @throws WrappedRpcServerException - if the header cannot be
|
|
* deserialized, or the user is not authorized
|
|
* deserialized, or the user is not authorized
|
|
*/
|
|
*/
|
|
- private void processConnectionContext(DataInputStream dis)
|
|
|
|
|
|
+ private void processConnectionContext(RpcWritable.Buffer buffer)
|
|
throws WrappedRpcServerException {
|
|
throws WrappedRpcServerException {
|
|
// allow only one connection context during a session
|
|
// allow only one connection context during a session
|
|
if (connectionContextRead) {
|
|
if (connectionContextRead) {
|
|
@@ -1974,13 +1969,12 @@ public abstract class Server {
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
"Connection context already processed");
|
|
"Connection context already processed");
|
|
}
|
|
}
|
|
- connectionContext = decodeProtobufFromStream(
|
|
|
|
- IpcConnectionContextProto.newBuilder(), dis);
|
|
|
|
|
|
+ connectionContext = getMessage(IpcConnectionContextProto.getDefaultInstance(), buffer);
|
|
protocolName = connectionContext.hasProtocol() ? connectionContext
|
|
protocolName = connectionContext.hasProtocol() ? connectionContext
|
|
.getProtocol() : null;
|
|
.getProtocol() : null;
|
|
|
|
|
|
UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
|
|
UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
|
|
- if (saslServer == null) {
|
|
|
|
|
|
+ if (authProtocol == AuthProtocol.NONE) {
|
|
user = protocolUser;
|
|
user = protocolUser;
|
|
} else {
|
|
} else {
|
|
// user is authenticated
|
|
// user is authenticated
|
|
@@ -2053,7 +2047,7 @@ public abstract class Server {
|
|
if (unwrappedData.remaining() == 0) {
|
|
if (unwrappedData.remaining() == 0) {
|
|
unwrappedDataLengthBuffer.clear();
|
|
unwrappedDataLengthBuffer.clear();
|
|
unwrappedData.flip();
|
|
unwrappedData.flip();
|
|
- processOneRpc(unwrappedData.array());
|
|
|
|
|
|
+ processOneRpc(unwrappedData);
|
|
unwrappedData = null;
|
|
unwrappedData = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -2077,36 +2071,35 @@ public abstract class Server {
|
|
* client in this method and does not require verbose logging by the
|
|
* client in this method and does not require verbose logging by the
|
|
* Listener thread
|
|
* Listener thread
|
|
* @throws InterruptedException
|
|
* @throws InterruptedException
|
|
- */
|
|
|
|
- private void processOneRpc(byte[] buf)
|
|
|
|
|
|
+ */
|
|
|
|
+ private void processOneRpc(ByteBuffer bb)
|
|
throws IOException, WrappedRpcServerException, InterruptedException {
|
|
throws IOException, WrappedRpcServerException, InterruptedException {
|
|
int callId = -1;
|
|
int callId = -1;
|
|
int retry = RpcConstants.INVALID_RETRY_COUNT;
|
|
int retry = RpcConstants.INVALID_RETRY_COUNT;
|
|
try {
|
|
try {
|
|
- final DataInputStream dis =
|
|
|
|
- new DataInputStream(new ByteArrayInputStream(buf));
|
|
|
|
|
|
+ final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
|
|
final RpcRequestHeaderProto header =
|
|
final RpcRequestHeaderProto header =
|
|
- decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
|
|
|
|
|
|
+ getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
|
|
callId = header.getCallId();
|
|
callId = header.getCallId();
|
|
retry = header.getRetryCount();
|
|
retry = header.getRetryCount();
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug(" got #" + callId);
|
|
LOG.debug(" got #" + callId);
|
|
}
|
|
}
|
|
checkRpcHeaders(header);
|
|
checkRpcHeaders(header);
|
|
-
|
|
|
|
|
|
+
|
|
if (callId < 0) { // callIds typically used during connection setup
|
|
if (callId < 0) { // callIds typically used during connection setup
|
|
- processRpcOutOfBandRequest(header, dis);
|
|
|
|
|
|
+ processRpcOutOfBandRequest(header, buffer);
|
|
} else if (!connectionContextRead) {
|
|
} else if (!connectionContextRead) {
|
|
throw new WrappedRpcServerException(
|
|
throw new WrappedRpcServerException(
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
"Connection context not established");
|
|
"Connection context not established");
|
|
} else {
|
|
} else {
|
|
- processRpcRequest(header, dis);
|
|
|
|
|
|
+ processRpcRequest(header, buffer);
|
|
}
|
|
}
|
|
} catch (WrappedRpcServerException wrse) { // inform client of error
|
|
} catch (WrappedRpcServerException wrse) { // inform client of error
|
|
Throwable ioe = wrse.getCause();
|
|
Throwable ioe = wrse.getCause();
|
|
final Call call = new Call(callId, retry, null, this);
|
|
final Call call = new Call(callId, retry, null, this);
|
|
- setupResponse(authFailedResponse, call,
|
|
|
|
|
|
+ setupResponse(call,
|
|
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
|
|
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
|
|
ioe.getClass().getName(), ioe.getMessage());
|
|
ioe.getClass().getName(), ioe.getMessage());
|
|
call.sendResponse();
|
|
call.sendResponse();
|
|
@@ -2157,7 +2150,7 @@ public abstract class Server {
|
|
* @throws InterruptedException
|
|
* @throws InterruptedException
|
|
*/
|
|
*/
|
|
private void processRpcRequest(RpcRequestHeaderProto header,
|
|
private void processRpcRequest(RpcRequestHeaderProto header,
|
|
- DataInputStream dis) throws WrappedRpcServerException,
|
|
|
|
|
|
+ RpcWritable.Buffer buffer) throws WrappedRpcServerException,
|
|
InterruptedException {
|
|
InterruptedException {
|
|
Class<? extends Writable> rpcRequestClass =
|
|
Class<? extends Writable> rpcRequestClass =
|
|
getRpcRequestWrapper(header.getRpcKind());
|
|
getRpcRequestWrapper(header.getRpcKind());
|
|
@@ -2171,8 +2164,7 @@ public abstract class Server {
|
|
}
|
|
}
|
|
Writable rpcRequest;
|
|
Writable rpcRequest;
|
|
try { //Read the rpc request
|
|
try { //Read the rpc request
|
|
- rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
|
|
|
|
- rpcRequest.readFields(dis);
|
|
|
|
|
|
+ rpcRequest = buffer.newInstance(rpcRequestClass, conf);
|
|
} catch (Throwable t) { // includes runtime exception from newInstance
|
|
} catch (Throwable t) { // includes runtime exception from newInstance
|
|
LOG.warn("Unable to read call parameters for client " +
|
|
LOG.warn("Unable to read call parameters for client " +
|
|
getHostAddress() + "on connection protocol " +
|
|
getHostAddress() + "on connection protocol " +
|
|
@@ -2253,8 +2245,8 @@ public abstract class Server {
|
|
* @throws InterruptedException
|
|
* @throws InterruptedException
|
|
*/
|
|
*/
|
|
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
|
|
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
|
|
- DataInputStream dis) throws WrappedRpcServerException, IOException,
|
|
|
|
- InterruptedException {
|
|
|
|
|
|
+ RpcWritable.Buffer buffer) throws WrappedRpcServerException,
|
|
|
|
+ IOException, InterruptedException {
|
|
final int callId = header.getCallId();
|
|
final int callId = header.getCallId();
|
|
if (callId == CONNECTION_CONTEXT_CALL_ID) {
|
|
if (callId == CONNECTION_CONTEXT_CALL_ID) {
|
|
// SASL must be established prior to connection context
|
|
// SASL must be established prior to connection context
|
|
@@ -2264,7 +2256,7 @@ public abstract class Server {
|
|
"Connection header sent during SASL negotiation");
|
|
"Connection header sent during SASL negotiation");
|
|
}
|
|
}
|
|
// read and authorize the user
|
|
// read and authorize the user
|
|
- processConnectionContext(dis);
|
|
|
|
|
|
+ processConnectionContext(buffer);
|
|
} else if (callId == AuthProtocol.SASL.callId) {
|
|
} else if (callId == AuthProtocol.SASL.callId) {
|
|
// if client was switched to simple, ignore first SASL message
|
|
// if client was switched to simple, ignore first SASL message
|
|
if (authProtocol != AuthProtocol.SASL) {
|
|
if (authProtocol != AuthProtocol.SASL) {
|
|
@@ -2272,7 +2264,7 @@ public abstract class Server {
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
"SASL protocol not requested by client");
|
|
"SASL protocol not requested by client");
|
|
}
|
|
}
|
|
- saslReadAndProcess(dis);
|
|
|
|
|
|
+ saslReadAndProcess(buffer);
|
|
} else if (callId == PING_CALL_ID) {
|
|
} else if (callId == PING_CALL_ID) {
|
|
LOG.debug("Received ping message");
|
|
LOG.debug("Received ping message");
|
|
} else {
|
|
} else {
|
|
@@ -2319,13 +2311,12 @@ public abstract class Server {
|
|
* @throws WrappedRpcServerException - deserialization failed
|
|
* @throws WrappedRpcServerException - deserialization failed
|
|
*/
|
|
*/
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- private <T extends Message> T decodeProtobufFromStream(Builder builder,
|
|
|
|
- DataInputStream dis) throws WrappedRpcServerException {
|
|
|
|
|
|
+ <T extends Message> T getMessage(Message message,
|
|
|
|
+ RpcWritable.Buffer buffer) throws WrappedRpcServerException {
|
|
try {
|
|
try {
|
|
- builder.mergeDelimitedFrom(dis);
|
|
|
|
- return (T)builder.build();
|
|
|
|
|
|
+ return (T)buffer.getValue(message);
|
|
} catch (Exception ioe) {
|
|
} catch (Exception ioe) {
|
|
- Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
|
|
|
|
|
|
+ Class<?> protoClass = message.getClass();
|
|
throw new WrappedRpcServerException(
|
|
throw new WrappedRpcServerException(
|
|
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
|
|
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
|
|
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
|
|
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
|
|
@@ -2336,6 +2327,17 @@ public abstract class Server {
|
|
responder.doRespond(call);
|
|
responder.doRespond(call);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void abortResponse(Call call, Throwable t) throws IOException {
|
|
|
|
+ // clone the call to prevent a race with the other thread stomping
|
|
|
|
+ // on the response while being sent. the original call is
|
|
|
|
+ // effectively discarded since the wait count won't hit zero
|
|
|
|
+ call = new Call(call);
|
|
|
|
+ setupResponse(call,
|
|
|
|
+ RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
|
|
|
|
+ null, t.getClass().getName(), StringUtils.stringifyException(t));
|
|
|
|
+ call.sendResponse();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Get service class for connection
|
|
* Get service class for connection
|
|
* @return the serviceClass
|
|
* @return the serviceClass
|
|
@@ -2379,8 +2381,6 @@ public abstract class Server {
|
|
public void run() {
|
|
public void run() {
|
|
LOG.debug(Thread.currentThread().getName() + ": starting");
|
|
LOG.debug(Thread.currentThread().getName() + ": starting");
|
|
SERVER.set(Server.this);
|
|
SERVER.set(Server.this);
|
|
- ByteArrayOutputStream buf =
|
|
|
|
- new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
|
|
|
|
while (running) {
|
|
while (running) {
|
|
TraceScope traceScope = null;
|
|
TraceScope traceScope = null;
|
|
try {
|
|
try {
|
|
@@ -2450,16 +2450,8 @@ public abstract class Server {
|
|
}
|
|
}
|
|
CurCall.set(null);
|
|
CurCall.set(null);
|
|
synchronized (call.connection.responseQueue) {
|
|
synchronized (call.connection.responseQueue) {
|
|
- setupResponse(buf, call, returnStatus, detailedErr,
|
|
|
|
|
|
+ setupResponse(call, returnStatus, detailedErr,
|
|
value, errorClass, error);
|
|
value, errorClass, error);
|
|
-
|
|
|
|
- // Discard the large buf and reset it back to smaller size
|
|
|
|
- // to free up heap.
|
|
|
|
- if (buf.size() > maxRespSize) {
|
|
|
|
- LOG.warn("Large response size " + buf.size() + " for call "
|
|
|
|
- + call.toString());
|
|
|
|
- buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
|
|
|
|
- }
|
|
|
|
call.sendResponse();
|
|
call.sendResponse();
|
|
}
|
|
}
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
@@ -2677,13 +2669,11 @@ public abstract class Server {
|
|
* @param error error message, if the call failed
|
|
* @param error error message, if the call failed
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private static void setupResponse(ByteArrayOutputStream responseBuf,
|
|
|
|
- Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
|
|
|
|
- Writable rv, String errorClass, String error)
|
|
|
|
- throws IOException {
|
|
|
|
- responseBuf.reset();
|
|
|
|
- DataOutputStream out = new DataOutputStream(responseBuf);
|
|
|
|
- RpcResponseHeaderProto.Builder headerBuilder =
|
|
|
|
|
|
+ private void setupResponse(
|
|
|
|
+ Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
|
|
|
|
+ Writable rv, String errorClass, String error)
|
|
|
|
+ throws IOException {
|
|
|
|
+ RpcResponseHeaderProto.Builder headerBuilder =
|
|
RpcResponseHeaderProto.newBuilder();
|
|
RpcResponseHeaderProto.newBuilder();
|
|
headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
|
|
headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
|
|
headerBuilder.setCallId(call.callId);
|
|
headerBuilder.setCallId(call.callId);
|
|
@@ -2693,32 +2683,14 @@ public abstract class Server {
|
|
|
|
|
|
if (status == RpcStatusProto.SUCCESS) {
|
|
if (status == RpcStatusProto.SUCCESS) {
|
|
RpcResponseHeaderProto header = headerBuilder.build();
|
|
RpcResponseHeaderProto header = headerBuilder.build();
|
|
- final int headerLen = header.getSerializedSize();
|
|
|
|
- int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) +
|
|
|
|
- headerLen;
|
|
|
|
try {
|
|
try {
|
|
- if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
|
|
|
|
- ProtobufRpcEngine.RpcWrapper resWrapper =
|
|
|
|
- (ProtobufRpcEngine.RpcWrapper) rv;
|
|
|
|
- fullLength += resWrapper.getLength();
|
|
|
|
- out.writeInt(fullLength);
|
|
|
|
- header.writeDelimitedTo(out);
|
|
|
|
- rv.write(out);
|
|
|
|
- } else { // Have to serialize to buffer to get len
|
|
|
|
- final DataOutputBuffer buf = new DataOutputBuffer();
|
|
|
|
- rv.write(buf);
|
|
|
|
- byte[] data = buf.getData();
|
|
|
|
- fullLength += buf.getLength();
|
|
|
|
- out.writeInt(fullLength);
|
|
|
|
- header.writeDelimitedTo(out);
|
|
|
|
- out.write(data, 0, buf.getLength());
|
|
|
|
- }
|
|
|
|
|
|
+ setupResponse(call, header, rv);
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.warn("Error serializing call response for call " + call, t);
|
|
LOG.warn("Error serializing call response for call " + call, t);
|
|
// Call back to same function - this is OK since the
|
|
// Call back to same function - this is OK since the
|
|
// buffer is reset at the top, and since status is changed
|
|
// buffer is reset at the top, and since status is changed
|
|
// to ERROR it won't infinite loop.
|
|
// to ERROR it won't infinite loop.
|
|
- setupResponse(responseBuf, call, RpcStatusProto.ERROR,
|
|
|
|
|
|
+ setupResponse(call, RpcStatusProto.ERROR,
|
|
RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE,
|
|
RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE,
|
|
null, t.getClass().getName(),
|
|
null, t.getClass().getName(),
|
|
StringUtils.stringifyException(t));
|
|
StringUtils.stringifyException(t));
|
|
@@ -2728,16 +2700,30 @@ public abstract class Server {
|
|
headerBuilder.setExceptionClassName(errorClass);
|
|
headerBuilder.setExceptionClassName(errorClass);
|
|
headerBuilder.setErrorMsg(error);
|
|
headerBuilder.setErrorMsg(error);
|
|
headerBuilder.setErrorDetail(erCode);
|
|
headerBuilder.setErrorDetail(erCode);
|
|
- RpcResponseHeaderProto header = headerBuilder.build();
|
|
|
|
- int headerLen = header.getSerializedSize();
|
|
|
|
- final int fullLength =
|
|
|
|
- CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
|
|
|
|
- out.writeInt(fullLength);
|
|
|
|
- header.writeDelimitedTo(out);
|
|
|
|
|
|
+ setupResponse(call, headerBuilder.build(), null);
|
|
}
|
|
}
|
|
- call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private void setupResponse(Call call,
|
|
|
|
+ RpcResponseHeaderProto header, Writable rv) throws IOException {
|
|
|
|
+ ResponseBuffer buf = responseBuffer.get().reset();
|
|
|
|
+ try {
|
|
|
|
+ RpcWritable.wrap(header).writeTo(buf);
|
|
|
|
+ if (rv != null) {
|
|
|
|
+ RpcWritable.wrap(rv).writeTo(buf);
|
|
|
|
+ }
|
|
|
|
+ call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
|
|
|
|
+ } finally {
|
|
|
|
+ // Discard a large buf and reset it back to smaller size
|
|
|
|
+ // to free up heap.
|
|
|
|
+ if (buf.capacity() > maxRespSize) {
|
|
|
|
+ LOG.warn("Large response size " + buf.size() + " for call "
|
|
|
|
+ + call.toString());
|
|
|
|
+ buf.setCapacity(INITIAL_RESP_BUF_SIZE);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Setup response for the IPC Call on Fatal Error from a
|
|
* Setup response for the IPC Call on Fatal Error from a
|
|
* client that is using old version of Hadoop.
|
|
* client that is using old version of Hadoop.
|
|
@@ -2764,10 +2750,8 @@ public abstract class Server {
|
|
WritableUtils.writeString(out, error);
|
|
WritableUtils.writeString(out, error);
|
|
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
|
|
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
|
|
}
|
|
}
|
|
-
|
|
|
|
-
|
|
|
|
- private static void wrapWithSasl(ByteArrayOutputStream response, Call call)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+
|
|
|
|
+ private void wrapWithSasl(Call call) throws IOException {
|
|
if (call.connection.saslServer != null) {
|
|
if (call.connection.saslServer != null) {
|
|
byte[] token = call.rpcResponse.array();
|
|
byte[] token = call.rpcResponse.array();
|
|
// synchronization may be needed since there can be multiple Handler
|
|
// synchronization may be needed since there can be multiple Handler
|
|
@@ -2778,7 +2762,6 @@ public abstract class Server {
|
|
if (LOG.isDebugEnabled())
|
|
if (LOG.isDebugEnabled())
|
|
LOG.debug("Adding saslServer wrapped token of size " + token.length
|
|
LOG.debug("Adding saslServer wrapped token of size " + token.length
|
|
+ " as call response.");
|
|
+ " as call response.");
|
|
- response.reset();
|
|
|
|
// rebuild with sasl header and payload
|
|
// rebuild with sasl header and payload
|
|
RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
|
|
RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
|
|
.setCallId(AuthProtocol.SASL.callId)
|
|
.setCallId(AuthProtocol.SASL.callId)
|
|
@@ -2786,14 +2769,9 @@ public abstract class Server {
|
|
.build();
|
|
.build();
|
|
RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
|
|
RpcSaslProto saslMessage = RpcSaslProto.newBuilder()
|
|
.setState(SaslState.WRAP)
|
|
.setState(SaslState.WRAP)
|
|
- .setToken(ByteString.copyFrom(token, 0, token.length))
|
|
|
|
|
|
+ .setToken(ByteString.copyFrom(token))
|
|
.build();
|
|
.build();
|
|
- RpcResponseMessageWrapper saslResponse =
|
|
|
|
- new RpcResponseMessageWrapper(saslHeader, saslMessage);
|
|
|
|
-
|
|
|
|
- DataOutputStream out = new DataOutputStream(response);
|
|
|
|
- out.writeInt(saslResponse.getLength());
|
|
|
|
- saslResponse.write(out);
|
|
|
|
|
|
+ setupResponse(call, saslHeader, RpcWritable.wrap(saslMessage));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|