|
@@ -1339,7 +1339,7 @@ public abstract class Server {
|
|
|
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
|
|
|
+ ") is configured as simple. Please configure another method "
|
|
|
+ "like kerberos or digest.");
|
|
|
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
|
|
|
+ setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
|
|
|
null, ae.getClass().getName(), ae.getMessage());
|
|
|
responder.doRespond(authFailedCall);
|
|
|
throw ae;
|
|
@@ -1420,7 +1420,7 @@ public abstract class Server {
|
|
|
Call fakeCall = new Call(-1, null, this);
|
|
|
// Versions 3 and greater can interpret this exception
|
|
|
// response in the same manner
|
|
|
- setupResponse(buffer, fakeCall, Status.FATAL,
|
|
|
+ setupResponseOldVersionFatal(buffer, fakeCall,
|
|
|
null, VersionMismatch.class.getName(), errMsg);
|
|
|
|
|
|
responder.doRespond(fakeCall);
|
|
@@ -1443,7 +1443,7 @@ public abstract class Server {
|
|
|
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
|
|
|
|
|
Call fakeCall = new Call(-1, null, this);
|
|
|
- setupResponse(buffer, fakeCall, Status.FATAL, null,
|
|
|
+ setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, null,
|
|
|
IpcException.class.getName(), errMsg);
|
|
|
responder.doRespond(fakeCall);
|
|
|
}
|
|
@@ -1579,7 +1579,7 @@ public abstract class Server {
|
|
|
new Call(header.getCallId(), null, this);
|
|
|
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
|
|
|
|
|
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
|
|
|
+ setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
|
|
|
IOException.class.getName(),
|
|
|
"Unknown rpc kind " + header.getRpcKind());
|
|
|
responder.doRespond(readParamsFailedCall);
|
|
@@ -1597,7 +1597,7 @@ public abstract class Server {
|
|
|
new Call(header.getCallId(), null, this);
|
|
|
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
|
|
|
|
|
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
|
|
|
+ setupResponse(responseBuffer, readParamsFailedCall, RpcStatusProto.FATAL, null,
|
|
|
t.getClass().getName(),
|
|
|
"IPC server unable to read call parameters: " + t.getMessage());
|
|
|
responder.doRespond(readParamsFailedCall);
|
|
@@ -1627,7 +1627,7 @@ public abstract class Server {
|
|
|
rpcMetrics.incrAuthorizationSuccesses();
|
|
|
} catch (AuthorizationException ae) {
|
|
|
rpcMetrics.incrAuthorizationFailures();
|
|
|
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
|
|
|
+ setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL, null,
|
|
|
ae.getClass().getName(), ae.getMessage());
|
|
|
responder.doRespond(authFailedCall);
|
|
|
return false;
|
|
@@ -1725,8 +1725,8 @@ public abstract class Server {
|
|
|
// responder.doResponse() since setupResponse may use
|
|
|
// SASL to encrypt response data and SASL enforces
|
|
|
// its own message ordering.
|
|
|
- setupResponse(buf, call, (error == null) ? Status.SUCCESS
|
|
|
- : Status.ERROR, value, errorClass, error);
|
|
|
+ setupResponse(buf, call, (error == null) ? RpcStatusProto.SUCCESS
|
|
|
+ : RpcStatusProto.ERROR, value, errorClass, error);
|
|
|
|
|
|
// Discard the large buf and reset it back to smaller size
|
|
|
// to free up heap
|
|
@@ -1859,40 +1859,79 @@ public abstract class Server {
|
|
|
/**
|
|
|
* Setup response for the IPC Call.
|
|
|
*
|
|
|
- * @param response buffer to serialize the response into
|
|
|
+ * @param responseBuf buffer to serialize the response into
|
|
|
* @param call {@link Call} to which we are setting up the response
|
|
|
- * @param status {@link Status} of the IPC call
|
|
|
+ * @param status of the IPC call
|
|
|
* @param rv return value for the IPC Call, if the call was successful
|
|
|
* @param errorClass error class, if the the call failed
|
|
|
* @param error error message, if the call failed
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void setupResponse(ByteArrayOutputStream response,
|
|
|
- Call call, Status status,
|
|
|
+ private void setupResponse(ByteArrayOutputStream responseBuf,
|
|
|
+ Call call, RpcStatusProto status,
|
|
|
Writable rv, String errorClass, String error)
|
|
|
throws IOException {
|
|
|
- response.reset();
|
|
|
- DataOutputStream out = new DataOutputStream(response);
|
|
|
- out.writeInt(call.callId); // write call id
|
|
|
- out.writeInt(status.state); // write status
|
|
|
+ responseBuf.reset();
|
|
|
+ DataOutputStream out = new DataOutputStream(responseBuf);
|
|
|
+ RpcResponseHeaderProto.Builder response =
|
|
|
+ RpcResponseHeaderProto.newBuilder();
|
|
|
+ response.setCallId(call.callId);
|
|
|
+ response.setStatus(status);
|
|
|
|
|
|
- if (status == Status.SUCCESS) {
|
|
|
+
|
|
|
+ if (status == RpcStatusProto.SUCCESS) {
|
|
|
try {
|
|
|
+ response.build().writeDelimitedTo(out);
|
|
|
rv.write(out);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn("Error serializing call response for call " + call, t);
|
|
|
// Call back to same function - this is OK since the
|
|
|
// buffer is reset at the top, and since status is changed
|
|
|
// to ERROR it won't infinite loop.
|
|
|
- setupResponse(response, call, Status.ERROR,
|
|
|
+ setupResponse(responseBuf, call, RpcStatusProto.ERROR,
|
|
|
null, t.getClass().getName(),
|
|
|
StringUtils.stringifyException(t));
|
|
|
return;
|
|
|
}
|
|
|
} else {
|
|
|
+ if (status == RpcStatusProto.FATAL) {
|
|
|
+ response.setServerIpcVersionNum(Server.CURRENT_VERSION);
|
|
|
+ }
|
|
|
+ response.build().writeDelimitedTo(out);
|
|
|
WritableUtils.writeString(out, errorClass);
|
|
|
WritableUtils.writeString(out, error);
|
|
|
}
|
|
|
+ if (call.connection.useWrap) {
|
|
|
+ wrapWithSasl(responseBuf, call);
|
|
|
+ }
|
|
|
+ call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Setup response for the IPC Call on Fatal Error from a
|
|
|
+ * client that is using old version of Hadoop.
|
|
|
+ * The response is serialized using the previous protocol's response
|
|
|
+ * layout.
|
|
|
+ *
|
|
|
+ * @param response buffer to serialize the response into
|
|
|
+ * @param call {@link Call} to which we are setting up the response
|
|
|
+ * @param rv return value for the IPC Call, if the call was successful
|
|
|
+ * @param errorClass error class, if the the call failed
|
|
|
+ * @param error error message, if the call failed
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
|
|
|
+ Call call,
|
|
|
+ Writable rv, String errorClass, String error)
|
|
|
+ throws IOException {
|
|
|
+ final int OLD_VERSION_FATAL_STATUS = -1;
|
|
|
+ response.reset();
|
|
|
+ DataOutputStream out = new DataOutputStream(response);
|
|
|
+ out.writeInt(call.callId); // write call id
|
|
|
+ out.writeInt(OLD_VERSION_FATAL_STATUS); // write FATAL_STATUS
|
|
|
+ WritableUtils.writeString(out, errorClass);
|
|
|
+ WritableUtils.writeString(out, error);
|
|
|
+
|
|
|
if (call.connection.useWrap) {
|
|
|
wrapWithSasl(response, call);
|
|
|
}
|