|
@@ -1241,20 +1241,16 @@ public abstract class Server {
|
|
|
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
|
|
|
throw ieo;
|
|
|
} catch (Exception e) {
|
|
|
- // Do not log WrappedRpcServerExceptionSuppressed.
|
|
|
- if (!(e instanceof WrappedRpcServerExceptionSuppressed)) {
|
|
|
- // A WrappedRpcServerException is an exception that has been sent
|
|
|
- // to the client, so the stacktrace is unnecessary; any other
|
|
|
- // exceptions are unexpected internal server errors and thus the
|
|
|
- // stacktrace should be logged.
|
|
|
- LOG.info(Thread.currentThread().getName() +
|
|
|
- ": readAndProcess from client " + c.getHostAddress() +
|
|
|
- " threw exception [" + e + "]",
|
|
|
- (e instanceof WrappedRpcServerException) ? null : e);
|
|
|
- }
|
|
|
+ // Any exceptions that reach here are fatal unexpected internal errors
|
|
|
+ // that could not be sent to the client.
|
|
|
+ LOG.info(Thread.currentThread().getName() +
|
|
|
+ ": readAndProcess from client " + c +
|
|
|
+ " threw exception [" + e + "]", e);
|
|
|
count = -1; //so that the (count < 0) block is executed
|
|
|
}
|
|
|
- if (count < 0) {
|
|
|
+ // setupResponse will signal the connection should be closed when a
|
|
|
+ // fatal response is sent.
|
|
|
+ if (count < 0 || c.shouldClose()) {
|
|
|
closeConnection(c);
|
|
|
c = null;
|
|
|
}
|
|
@@ -1582,16 +1578,20 @@ public abstract class Server {
|
|
|
* unnecessary stack trace logging if it's not an internal server error.
|
|
|
*/
|
|
|
@SuppressWarnings("serial")
|
|
|
- private static class WrappedRpcServerException extends RpcServerException {
|
|
|
+ private static class FatalRpcServerException extends RpcServerException {
|
|
|
private final RpcErrorCodeProto errCode;
|
|
|
- public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
|
|
|
+ public FatalRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {
|
|
|
super(ioe.toString(), ioe);
|
|
|
this.errCode = errCode;
|
|
|
}
|
|
|
- public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {
|
|
|
+ public FatalRpcServerException(RpcErrorCodeProto errCode, String message) {
|
|
|
this(errCode, new RpcServerException(message));
|
|
|
}
|
|
|
@Override
|
|
|
+ public RpcStatusProto getRpcStatusProto() {
|
|
|
+ return RpcStatusProto.FATAL;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
public RpcErrorCodeProto getRpcErrorCodeProto() {
|
|
|
return errCode;
|
|
|
}
|
|
@@ -1601,19 +1601,6 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * A WrappedRpcServerException that is suppressed altogether
|
|
|
- * for the purposes of logging.
|
|
|
- */
|
|
|
- @SuppressWarnings("serial")
|
|
|
- private static class WrappedRpcServerExceptionSuppressed
|
|
|
- extends WrappedRpcServerException {
|
|
|
- public WrappedRpcServerExceptionSuppressed(
|
|
|
- RpcErrorCodeProto errCode, IOException ioe) {
|
|
|
- super(errCode, ioe);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/** Reads calls from a connection and queues them for handling. */
|
|
|
public class Connection {
|
|
|
private boolean connectionHeaderRead = false; // connection header is read?
|
|
@@ -1645,7 +1632,8 @@ public abstract class Server {
|
|
|
private ByteBuffer unwrappedData;
|
|
|
private ByteBuffer unwrappedDataLengthBuffer;
|
|
|
private int serviceClass;
|
|
|
-
|
|
|
+ private boolean shouldClose = false;
|
|
|
+
|
|
|
UserGroupInformation user = null;
|
|
|
public UserGroupInformation attemptingUser = null; // user name before auth
|
|
|
|
|
@@ -1689,7 +1677,15 @@ public abstract class Server {
|
|
|
public String toString() {
|
|
|
return getHostAddress() + ":" + remotePort;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ boolean setShouldClose() {
|
|
|
+ return shouldClose = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean shouldClose() {
|
|
|
+ return shouldClose;
|
|
|
+ }
|
|
|
+
|
|
|
public String getHostAddress() {
|
|
|
return hostAddress;
|
|
|
}
|
|
@@ -1743,13 +1739,13 @@ public abstract class Server {
|
|
|
}
|
|
|
|
|
|
private void saslReadAndProcess(RpcWritable.Buffer buffer) throws
|
|
|
- WrappedRpcServerException, IOException, InterruptedException {
|
|
|
+ RpcServerException, IOException, InterruptedException {
|
|
|
final RpcSaslProto saslMessage =
|
|
|
getMessage(RpcSaslProto.getDefaultInstance(), buffer);
|
|
|
switch (saslMessage.getState()) {
|
|
|
case WRAP: {
|
|
|
if (!saslContextEstablished || !useWrap) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
new SaslException("Server is not wrapping data"));
|
|
|
}
|
|
@@ -1797,7 +1793,7 @@ public abstract class Server {
|
|
|
/**
|
|
|
* Process saslMessage and send saslResponse back
|
|
|
* @param saslMessage received SASL message
|
|
|
- * @throws WrappedRpcServerException setup failed due to SASL negotiation
|
|
|
+ * @throws RpcServerException setup failed due to SASL negotiation
|
|
|
* failure, premature or invalid connection context, or other state
|
|
|
* errors. This exception needs to be sent to the client. This
|
|
|
* exception will wrap {@link RetriableException},
|
|
@@ -1807,9 +1803,9 @@ public abstract class Server {
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
private void saslProcess(RpcSaslProto saslMessage)
|
|
|
- throws WrappedRpcServerException, IOException, InterruptedException {
|
|
|
+ throws RpcServerException, IOException, InterruptedException {
|
|
|
if (saslContextEstablished) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
new SaslException("Negotiation is already complete"));
|
|
|
}
|
|
@@ -1843,10 +1839,10 @@ public abstract class Server {
|
|
|
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
|
|
|
saslContextEstablished = true;
|
|
|
}
|
|
|
- } catch (WrappedRpcServerException wrse) { // don't re-wrap
|
|
|
- throw wrse;
|
|
|
+ } catch (RpcServerException rse) { // don't re-wrap
|
|
|
+ throw rse;
|
|
|
} catch (IOException ioe) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
|
|
|
}
|
|
|
// send back response if any, may throw IOException
|
|
@@ -1977,14 +1973,14 @@ public abstract class Server {
|
|
|
setupResponse(saslCall,
|
|
|
RpcStatusProto.SUCCESS, null,
|
|
|
RpcWritable.wrap(message), null, null);
|
|
|
- saslCall.sendResponse();
|
|
|
+ sendResponse(saslCall);
|
|
|
}
|
|
|
|
|
|
private void doSaslReply(Exception ioe) throws IOException {
|
|
|
setupResponse(authFailedCall,
|
|
|
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
|
|
|
null, ioe.getClass().getName(), ioe.getLocalizedMessage());
|
|
|
- authFailedCall.sendResponse();
|
|
|
+ sendResponse(authFailedCall);
|
|
|
}
|
|
|
|
|
|
private void disposeSasl() {
|
|
@@ -2026,16 +2022,12 @@ public abstract class Server {
|
|
|
* rpc request length.
|
|
|
*
|
|
|
* @return -1 in case of error, else num bytes read so far
|
|
|
- * @throws WrappedRpcServerException - an exception that has already been
|
|
|
- * sent back to the client that does not require verbose logging
|
|
|
- * by the Listener thread
|
|
|
* @throws IOException - internal error that should not be returned to
|
|
|
* client, typically failure to respond to client
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
- public int readAndProcess()
|
|
|
- throws WrappedRpcServerException, IOException, InterruptedException {
|
|
|
- while (true) {
|
|
|
+ public int readAndProcess() throws IOException, InterruptedException {
|
|
|
+ while (!shouldClose()) { // stop if a fatal response has been sent.
|
|
|
// dataLengthBuffer is used to read "hrpc" or the rpc-packet length
|
|
|
int count = -1;
|
|
|
if (dataLengthBuffer.remaining() > 0) {
|
|
@@ -2101,9 +2093,10 @@ public abstract class Server {
|
|
|
if (data.remaining() == 0) {
|
|
|
dataLengthBuffer.clear(); // to read length of future rpc packets
|
|
|
data.flip();
|
|
|
+ ByteBuffer requestData = data;
|
|
|
+ data = null; // null out in case processOneRpc throws.
|
|
|
boolean isHeaderRead = connectionContextRead;
|
|
|
- processOneRpc(data);
|
|
|
- data = null;
|
|
|
+ processOneRpc(requestData);
|
|
|
// the last rpc-request we processed could have simply been the
|
|
|
// connectionContext; if so continue to read the first RPC.
|
|
|
if (!isHeaderRead) {
|
|
@@ -2112,6 +2105,7 @@ public abstract class Server {
|
|
|
}
|
|
|
return count;
|
|
|
}
|
|
|
+ return -1;
|
|
|
}
|
|
|
|
|
|
private AuthProtocol initializeAuthContext(int authType)
|
|
@@ -2194,14 +2188,14 @@ public abstract class Server {
|
|
|
setupResponse(fakeCall,
|
|
|
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
|
|
|
null, VersionMismatch.class.getName(), errMsg);
|
|
|
- fakeCall.sendResponse();
|
|
|
+ sendResponse(fakeCall);
|
|
|
} else if (clientVersion >= 3) {
|
|
|
RpcCall fakeCall = new RpcCall(this, -1);
|
|
|
// Versions 3 to 8 use older response
|
|
|
setupResponseOldVersionFatal(buffer, fakeCall,
|
|
|
null, VersionMismatch.class.getName(), errMsg);
|
|
|
|
|
|
- fakeCall.sendResponse();
|
|
|
+ sendResponse(fakeCall);
|
|
|
} else if (clientVersion == 2) { // Hadoop 0.18.3
|
|
|
RpcCall fakeCall = new RpcCall(this, 0);
|
|
|
DataOutputStream out = new DataOutputStream(buffer);
|
|
@@ -2210,7 +2204,7 @@ public abstract class Server {
|
|
|
WritableUtils.writeString(out, VersionMismatch.class.getName());
|
|
|
WritableUtils.writeString(out, errMsg);
|
|
|
fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));
|
|
|
- fakeCall.sendResponse();
|
|
|
+ sendResponse(fakeCall);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2218,18 +2212,18 @@ public abstract class Server {
|
|
|
RpcCall fakeCall = new RpcCall(this, 0);
|
|
|
fakeCall.setResponse(ByteBuffer.wrap(
|
|
|
RECEIVED_HTTP_REQ_RESPONSE.getBytes(StandardCharsets.UTF_8)));
|
|
|
- fakeCall.sendResponse();
|
|
|
+ sendResponse(fakeCall);
|
|
|
}
|
|
|
|
|
|
/** Reads the connection context following the connection header
|
|
|
- * @throws WrappedRpcServerException - if the header cannot be
|
|
|
+ * @throws RpcServerException - if the header cannot be
|
|
|
* deserialized, or the user is not authorized
|
|
|
*/
|
|
|
private void processConnectionContext(RpcWritable.Buffer buffer)
|
|
|
- throws WrappedRpcServerException {
|
|
|
+ throws RpcServerException {
|
|
|
// allow only one connection context during a session
|
|
|
if (connectionContextRead) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
"Connection context already processed");
|
|
|
}
|
|
@@ -2250,7 +2244,7 @@ public abstract class Server {
|
|
|
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
|
|
|
if (authMethod == AuthMethod.TOKEN) {
|
|
|
// Not allowed to doAs if token authentication is used
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_UNAUTHORIZED,
|
|
|
new AccessControlException("Authenticated user (" + user
|
|
|
+ ") doesn't match what the client claims to be ("
|
|
@@ -2278,13 +2272,10 @@ public abstract class Server {
|
|
|
* each embedded RPC request
|
|
|
* @param inBuf - SASL wrapped request of one or more RPCs
|
|
|
* @throws IOException - SASL packet cannot be unwrapped
|
|
|
- * @throws WrappedRpcServerException - an exception that has already been
|
|
|
- * sent back to the client that does not require verbose logging
|
|
|
- * by the Listener thread
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
|
|
|
- throws WrappedRpcServerException, IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Have read input token of size " + inBuf.length
|
|
|
+ " for processing by saslServer.unwrap()");
|
|
@@ -2293,7 +2284,7 @@ public abstract class Server {
|
|
|
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
|
|
|
inBuf));
|
|
|
// Read all RPCs contained in the inBuf, even partial ones
|
|
|
- while (true) {
|
|
|
+ while (!shouldClose()) { // stop if a fatal response has been sent.
|
|
|
int count = -1;
|
|
|
if (unwrappedDataLengthBuffer.remaining() > 0) {
|
|
|
count = channelRead(ch, unwrappedDataLengthBuffer);
|
|
@@ -2314,8 +2305,9 @@ public abstract class Server {
|
|
|
if (unwrappedData.remaining() == 0) {
|
|
|
unwrappedDataLengthBuffer.clear();
|
|
|
unwrappedData.flip();
|
|
|
- processOneRpc(unwrappedData);
|
|
|
- unwrappedData = null;
|
|
|
+ ByteBuffer requestData = unwrappedData;
|
|
|
+ unwrappedData = null; // null out in case processOneRpc throws.
|
|
|
+ processOneRpc(requestData);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2334,13 +2326,13 @@ public abstract class Server {
|
|
|
* @param bb - contains the RPC request header and the rpc request
|
|
|
* @throws IOException - internal error that should not be returned to
|
|
|
* client, typically failure to respond to client
|
|
|
- * @throws WrappedRpcServerException - an exception that is sent back to the
|
|
|
- * client in this method and does not require verbose logging by the
|
|
|
- * Listener thread
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
private void processOneRpc(ByteBuffer bb)
|
|
|
- throws IOException, WrappedRpcServerException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ // exceptions that escape this method are fatal to the connection.
|
|
|
+ // setupResponse will use the rpc status to determine if the connection
|
|
|
+ // should be closed.
|
|
|
int callId = -1;
|
|
|
int retry = RpcConstants.INVALID_RETRY_COUNT;
|
|
|
try {
|
|
@@ -2357,40 +2349,47 @@ public abstract class Server {
|
|
|
if (callId < 0) { // callIds typically used during connection setup
|
|
|
processRpcOutOfBandRequest(header, buffer);
|
|
|
} else if (!connectionContextRead) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
"Connection context not established");
|
|
|
} else {
|
|
|
processRpcRequest(header, buffer);
|
|
|
}
|
|
|
- } catch (WrappedRpcServerException wrse) { // inform client of error
|
|
|
- Throwable ioe = wrse.getCause();
|
|
|
+ } catch (RpcServerException rse) {
|
|
|
+ // inform client of error, but do not rethrow else non-fatal
|
|
|
+ // exceptions will close connection!
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(Thread.currentThread().getName() +
|
|
|
+ ": processOneRpc from client " + this +
|
|
|
+ " threw exception [" + rse + "]");
|
|
|
+ }
|
|
|
+ // use the wrapped exception if there is one.
|
|
|
+ Throwable t = (rse.getCause() != null) ? rse.getCause() : rse;
|
|
|
final RpcCall call = new RpcCall(this, callId, retry);
|
|
|
setupResponse(call,
|
|
|
- RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
|
|
|
- ioe.getClass().getName(), ioe.getMessage());
|
|
|
- call.sendResponse();
|
|
|
- throw wrse;
|
|
|
+ rse.getRpcStatusProto(), rse.getRpcErrorCodeProto(), null,
|
|
|
+ t.getClass().getName(), t.getMessage());
|
|
|
+ sendResponse(call);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Verify RPC header is valid
|
|
|
* @param header - RPC request header
|
|
|
- * @throws WrappedRpcServerException - header contains invalid values
|
|
|
+ * @throws RpcServerException - header contains invalid values
|
|
|
*/
|
|
|
private void checkRpcHeaders(RpcRequestHeaderProto header)
|
|
|
- throws WrappedRpcServerException {
|
|
|
+ throws RpcServerException {
|
|
|
if (!header.hasRpcOp()) {
|
|
|
String err = " IPC Server: No rpc op in rpcRequestHeader";
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
|
|
|
}
|
|
|
if (header.getRpcOp() !=
|
|
|
RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
|
|
|
String err = "IPC Server does not implement rpc header operation" +
|
|
|
header.getRpcOp();
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
|
|
|
}
|
|
|
// If we know the rpc kind, get its class so that we can deserialize
|
|
@@ -2398,7 +2397,7 @@ public abstract class Server {
|
|
|
// we continue with this original design.
|
|
|
if (!header.hasRpcKind()) {
|
|
|
String err = " IPC Server: No rpc kind in rpcRequestHeader";
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
|
|
|
}
|
|
|
}
|
|
@@ -2411,13 +2410,15 @@ public abstract class Server {
|
|
|
* its response will be sent later when the request is processed.
|
|
|
* @param header - RPC request header
|
|
|
* @param buffer - stream to request payload
|
|
|
- * @throws WrappedRpcServerException - due to fatal rpc layer issues such
|
|
|
- * as invalid header or deserialization error. In this case a RPC fatal
|
|
|
- * status response will later be sent back to client.
|
|
|
+ * @throws RpcServerException - generally due to fatal rpc layer issues
|
|
|
+ * such as invalid header or deserialization error. The call queue
|
|
|
+ * may also throw a fatal or non-fatal exception on overflow.
|
|
|
+ * @throws IOException - fatal internal error that should/could not
|
|
|
+ * be sent to client.
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
private void processRpcRequest(RpcRequestHeaderProto header,
|
|
|
- RpcWritable.Buffer buffer) throws WrappedRpcServerException,
|
|
|
+ RpcWritable.Buffer buffer) throws RpcServerException,
|
|
|
InterruptedException {
|
|
|
Class<? extends Writable> rpcRequestClass =
|
|
|
getRpcRequestWrapper(header.getRpcKind());
|
|
@@ -2426,18 +2427,20 @@ public abstract class Server {
|
|
|
" from client " + getHostAddress());
|
|
|
final String err = "Unknown rpc kind in rpc header" +
|
|
|
header.getRpcKind();
|
|
|
- throw new WrappedRpcServerException(
|
|
|
- RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
|
|
|
+ throw new FatalRpcServerException(
|
|
|
+ RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
|
|
|
}
|
|
|
Writable rpcRequest;
|
|
|
try { //Read the rpc request
|
|
|
rpcRequest = buffer.newInstance(rpcRequestClass, conf);
|
|
|
+ } catch (RpcServerException rse) { // lets tests inject failures.
|
|
|
+ throw rse;
|
|
|
} catch (Throwable t) { // includes runtime exception from newInstance
|
|
|
LOG.warn("Unable to read call parameters for client " +
|
|
|
getHostAddress() + "on connection protocol " +
|
|
|
this.protocolName + " for rpcKind " + header.getRpcKind(), t);
|
|
|
String err = "IPC server unable to read call parameters: "+ t.getMessage();
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
|
|
|
}
|
|
|
|
|
@@ -2476,7 +2479,7 @@ public abstract class Server {
|
|
|
try {
|
|
|
queueCall(call);
|
|
|
} catch (IOException ioe) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
|
|
|
}
|
|
|
incRpcCount(); // Increment the rpc count
|
|
@@ -2487,7 +2490,7 @@ public abstract class Server {
|
|
|
* reading and authorizing the connection header
|
|
|
* @param header - RPC header
|
|
|
* @param buffer - stream to request payload
|
|
|
- * @throws WrappedRpcServerException - setup failed due to SASL
|
|
|
+ * @throws RpcServerException - setup failed due to SASL
|
|
|
* negotiation failure, premature or invalid connection context,
|
|
|
* or other state errors. This exception needs to be sent to the
|
|
|
* client.
|
|
@@ -2495,13 +2498,13 @@ public abstract class Server {
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
|
|
|
- RpcWritable.Buffer buffer) throws WrappedRpcServerException,
|
|
|
+ RpcWritable.Buffer buffer) throws RpcServerException,
|
|
|
IOException, InterruptedException {
|
|
|
final int callId = header.getCallId();
|
|
|
if (callId == CONNECTION_CONTEXT_CALL_ID) {
|
|
|
// SASL must be established prior to connection context
|
|
|
if (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
"Connection header sent during SASL negotiation");
|
|
|
}
|
|
@@ -2510,7 +2513,7 @@ public abstract class Server {
|
|
|
} else if (callId == AuthProtocol.SASL.callId) {
|
|
|
// if client was switched to simple, ignore first SASL message
|
|
|
if (authProtocol != AuthProtocol.SASL) {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
"SASL protocol not requested by client");
|
|
|
}
|
|
@@ -2518,7 +2521,7 @@ public abstract class Server {
|
|
|
} else if (callId == PING_CALL_ID) {
|
|
|
LOG.debug("Received ping message");
|
|
|
} else {
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
|
|
|
"Unknown out of band call #" + callId);
|
|
|
}
|
|
@@ -2526,9 +2529,9 @@ public abstract class Server {
|
|
|
|
|
|
/**
|
|
|
* Authorize proxy users to access this server
|
|
|
- * @throws WrappedRpcServerException - user is not allowed to proxy
|
|
|
+ * @throws RpcServerException - user is not allowed to proxy
|
|
|
*/
|
|
|
- private void authorizeConnection() throws WrappedRpcServerException {
|
|
|
+ private void authorizeConnection() throws RpcServerException {
|
|
|
try {
|
|
|
// If auth method is TOKEN, the token was obtained by the
|
|
|
// real user for the effective user, therefore not required to
|
|
@@ -2548,7 +2551,7 @@ public abstract class Server {
|
|
|
+ " for protocol " + connectionContext.getProtocol()
|
|
|
+ " is unauthorized for user " + user);
|
|
|
rpcMetrics.incrAuthorizationFailures();
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
|
|
|
}
|
|
|
}
|
|
@@ -2556,21 +2559,24 @@ public abstract class Server {
|
|
|
/**
|
|
|
* Decode the a protobuf from the given input stream
|
|
|
* @return Message - decoded protobuf
|
|
|
- * @throws WrappedRpcServerException - deserialization failed
|
|
|
+ * @throws RpcServerException - deserialization failed
|
|
|
*/
|
|
|
@SuppressWarnings("unchecked")
|
|
|
<T extends Message> T getMessage(Message message,
|
|
|
- RpcWritable.Buffer buffer) throws WrappedRpcServerException {
|
|
|
+ RpcWritable.Buffer buffer) throws RpcServerException {
|
|
|
try {
|
|
|
return (T)buffer.getValue(message);
|
|
|
} catch (Exception ioe) {
|
|
|
Class<?> protoClass = message.getClass();
|
|
|
- throw new WrappedRpcServerException(
|
|
|
+ throw new FatalRpcServerException(
|
|
|
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
|
|
|
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // ipc reader threads should invoke this directly, whereas handlers
|
|
|
+ // must invoke call.sendResponse to allow lifecycle management of
|
|
|
+ // external, postponed, deferred calls, etc.
|
|
|
private void sendResponse(RpcCall call) throws IOException {
|
|
|
responder.doRespond(call);
|
|
|
}
|
|
@@ -2873,6 +2879,10 @@ public abstract class Server {
|
|
|
RpcCall call, RpcStatusProto status, RpcErrorCodeProto erCode,
|
|
|
Writable rv, String errorClass, String error)
|
|
|
throws IOException {
|
|
|
+ // fatal responses will cause the reader to close the connection.
|
|
|
+ if (status == RpcStatusProto.FATAL) {
|
|
|
+ call.connection.setShouldClose();
|
|
|
+ }
|
|
|
RpcResponseHeaderProto.Builder headerBuilder =
|
|
|
RpcResponseHeaderProto.newBuilder();
|
|
|
headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
|