|
@@ -59,7 +59,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
-import org.apache.hadoop.io.WritableUtils;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
@@ -945,31 +944,38 @@ public class Client {
|
|
|
touch();
|
|
|
|
|
|
try {
|
|
|
- RpcResponseHeaderProto response =
|
|
|
+ RpcResponseHeaderProto header =
|
|
|
RpcResponseHeaderProto.parseDelimitedFrom(in);
|
|
|
- if (response == null) {
|
|
|
+ if (header == null) {
|
|
|
throw new IOException("Response is null.");
|
|
|
}
|
|
|
|
|
|
- int callId = response.getCallId();
|
|
|
+ int callId = header.getCallId();
|
|
|
if (LOG.isDebugEnabled())
|
|
|
LOG.debug(getName() + " got value #" + callId);
|
|
|
|
|
|
Call call = calls.get(callId);
|
|
|
- RpcStatusProto status = response.getStatus();
|
|
|
+ RpcStatusProto status = header.getStatus();
|
|
|
if (status == RpcStatusProto.SUCCESS) {
|
|
|
Writable value = ReflectionUtils.newInstance(valueClass, conf);
|
|
|
value.readFields(in); // read value
|
|
|
call.setRpcResponse(value);
|
|
|
calls.remove(callId);
|
|
|
- } else if (status == RpcStatusProto.ERROR) {
|
|
|
- call.setException(new RemoteException(WritableUtils.readString(in),
|
|
|
- WritableUtils.readString(in)));
|
|
|
- calls.remove(callId);
|
|
|
- } else if (status == RpcStatusProto.FATAL) {
|
|
|
- // Close the connection
|
|
|
- markClosed(new RemoteException(WritableUtils.readString(in),
|
|
|
- WritableUtils.readString(in)));
|
|
|
+ } else { // Rpc Request failed
|
|
|
+ final String exceptionClassName = header.hasExceptionClassName() ?
|
|
|
+ header.getExceptionClassName() :
|
|
|
+ "ServerDidNotSetExceptionClassName";
|
|
|
+ final String errorMsg = header.hasErrorMsg() ?
|
|
|
+ header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
|
|
|
+ RemoteException re =
|
|
|
+ new RemoteException(exceptionClassName, errorMsg);
|
|
|
+ if (status == RpcStatusProto.ERROR) {
|
|
|
+ call.setException(re);
|
|
|
+ calls.remove(callId);
|
|
|
+ } else if (status == RpcStatusProto.FATAL) {
|
|
|
+ // Close the connection
|
|
|
+ markClosed(re);
|
|
|
+ }
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
markClosed(e);
|