|
@@ -50,8 +50,9 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
-import org.apache.hadoop.ipc.RpcPayloadHeader.*;
|
|
|
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
|
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
|
|
|
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
|
|
|
|
+import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.Writable;
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
@@ -163,10 +164,10 @@ public class Client {
|
|
final Writable rpcRequest; // the serialized rpc request - RpcPayload
|
|
final Writable rpcRequest; // the serialized rpc request - RpcPayload
|
|
Writable rpcResponse; // null if rpc has error
|
|
Writable rpcResponse; // null if rpc has error
|
|
IOException error; // exception, null if success
|
|
IOException error; // exception, null if success
|
|
- final RpcKind rpcKind; // Rpc EngineKind
|
|
|
|
|
|
+ final RPC.RpcKind rpcKind; // Rpc EngineKind
|
|
boolean done; // true when call is done
|
|
boolean done; // true when call is done
|
|
|
|
|
|
- protected Call(RpcKind rpcKind, Writable param) {
|
|
|
|
|
|
+ protected Call(RPC.RpcKind rpcKind, Writable param) {
|
|
this.rpcKind = rpcKind;
|
|
this.rpcKind = rpcKind;
|
|
this.rpcRequest = param;
|
|
this.rpcRequest = param;
|
|
synchronized (Client.this) {
|
|
synchronized (Client.this) {
|
|
@@ -613,7 +614,7 @@ public class Client {
|
|
this.in = new DataInputStream(new BufferedInputStream(inStream));
|
|
this.in = new DataInputStream(new BufferedInputStream(inStream));
|
|
}
|
|
}
|
|
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
|
|
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
|
|
- writeHeader();
|
|
|
|
|
|
+ writeConnectionContext();
|
|
|
|
|
|
// update last activity time
|
|
// update last activity time
|
|
touch();
|
|
touch();
|
|
@@ -704,16 +705,17 @@ public class Client {
|
|
out.flush();
|
|
out.flush();
|
|
}
|
|
}
|
|
|
|
|
|
- /* Write the protocol header for each connection
|
|
|
|
|
|
+ /* Write the connection context header for each connection
|
|
* Out is not synchronized because only the first thread does this.
|
|
* Out is not synchronized because only the first thread does this.
|
|
*/
|
|
*/
|
|
- private void writeHeader() throws IOException {
|
|
|
|
|
|
+ private void writeConnectionContext() throws IOException {
|
|
// Write out the ConnectionHeader
|
|
// Write out the ConnectionHeader
|
|
DataOutputBuffer buf = new DataOutputBuffer();
|
|
DataOutputBuffer buf = new DataOutputBuffer();
|
|
connectionContext.writeTo(buf);
|
|
connectionContext.writeTo(buf);
|
|
|
|
|
|
// Write out the payload length
|
|
// Write out the payload length
|
|
int bufLen = buf.getLength();
|
|
int bufLen = buf.getLength();
|
|
|
|
+
|
|
out.writeInt(bufLen);
|
|
out.writeInt(bufLen);
|
|
out.write(buf.getData(), 0, bufLen);
|
|
out.write(buf.getData(), 0, bufLen);
|
|
}
|
|
}
|
|
@@ -806,21 +808,22 @@ public class Client {
|
|
if (LOG.isDebugEnabled())
|
|
if (LOG.isDebugEnabled())
|
|
LOG.debug(getName() + " sending #" + call.id);
|
|
LOG.debug(getName() + " sending #" + call.id);
|
|
|
|
|
|
- //for serializing the
|
|
|
|
- //data to be written
|
|
|
|
|
|
+ // Serializing the data to be written.
|
|
|
|
+ // Format:
|
|
|
|
+ // 0) Length of rest below (1 + 2)
|
|
|
|
+ // 1) PayloadHeader - is serialized Delimited hence contains length
|
|
|
|
+ // 2) the Payload - the RpcRequest
|
|
|
|
+ //
|
|
d = new DataOutputBuffer();
|
|
d = new DataOutputBuffer();
|
|
- d.writeInt(0); // placeholder for data length
|
|
|
|
- RpcPayloadHeader header = new RpcPayloadHeader(
|
|
|
|
- call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id);
|
|
|
|
- header.write(d);
|
|
|
|
|
|
+ RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
|
|
|
|
+ call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
|
|
|
|
+ header.writeDelimitedTo(d);
|
|
call.rpcRequest.write(d);
|
|
call.rpcRequest.write(d);
|
|
byte[] data = d.getData();
|
|
byte[] data = d.getData();
|
|
- int dataLength = d.getLength() - 4;
|
|
|
|
- data[0] = (byte)((dataLength >>> 24) & 0xff);
|
|
|
|
- data[1] = (byte)((dataLength >>> 16) & 0xff);
|
|
|
|
- data[2] = (byte)((dataLength >>> 8) & 0xff);
|
|
|
|
- data[3] = (byte)(dataLength & 0xff);
|
|
|
|
- out.write(data, 0, dataLength + 4);//write the data
|
|
|
|
|
|
+
|
|
|
|
+ int totalLength = d.getLength();
|
|
|
|
+ out.writeInt(totalLength); // Total Length
|
|
|
|
+ out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
|
|
out.flush();
|
|
out.flush();
|
|
}
|
|
}
|
|
} catch(IOException e) {
|
|
} catch(IOException e) {
|
|
@@ -937,7 +940,7 @@ public class Client {
|
|
private int index;
|
|
private int index;
|
|
|
|
|
|
public ParallelCall(Writable param, ParallelResults results, int index) {
|
|
public ParallelCall(Writable param, ParallelResults results, int index) {
|
|
- super(RpcKind.RPC_WRITABLE, param);
|
|
|
|
|
|
+ super(RPC.RpcKind.RPC_WRITABLE, param);
|
|
this.results = results;
|
|
this.results = results;
|
|
this.index = index;
|
|
this.index = index;
|
|
}
|
|
}
|
|
@@ -1022,22 +1025,22 @@ public class Client {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
|
|
|
|
|
|
+ * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}
|
|
* for RPC_BUILTIN
|
|
* for RPC_BUILTIN
|
|
*/
|
|
*/
|
|
public Writable call(Writable param, InetSocketAddress address)
|
|
public Writable call(Writable param, InetSocketAddress address)
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
- return call(RpcKind.RPC_BUILTIN, param, address);
|
|
|
|
|
|
+ return call(RPC.RpcKind.RPC_BUILTIN, param, address);
|
|
|
|
|
|
}
|
|
}
|
|
/** Make a call, passing <code>param</code>, to the IPC server running at
|
|
/** Make a call, passing <code>param</code>, to the IPC server running at
|
|
* <code>address</code>, returning the value. Throws exceptions if there are
|
|
* <code>address</code>, returning the value. Throws exceptions if there are
|
|
* network problems or if the remote code threw an exception.
|
|
* network problems or if the remote code threw an exception.
|
|
- * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
|
|
|
|
|
|
+ * @deprecated Use {@link #call(RPC.RpcKind, Writable,
|
|
* ConnectionId)} instead
|
|
* ConnectionId)} instead
|
|
*/
|
|
*/
|
|
@Deprecated
|
|
@Deprecated
|
|
- public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
|
|
|
|
|
|
+ public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
return call(rpcKind, param, address, null);
|
|
return call(rpcKind, param, address, null);
|
|
}
|
|
}
|
|
@@ -1047,11 +1050,11 @@ public class Client {
|
|
* the value.
|
|
* the value.
|
|
* Throws exceptions if there are network problems or if the remote code
|
|
* Throws exceptions if there are network problems or if the remote code
|
|
* threw an exception.
|
|
* threw an exception.
|
|
- * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
|
|
|
|
|
|
+ * @deprecated Use {@link #call(RPC.RpcKind, Writable,
|
|
* ConnectionId)} instead
|
|
* ConnectionId)} instead
|
|
*/
|
|
*/
|
|
@Deprecated
|
|
@Deprecated
|
|
- public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
|
|
|
|
+ public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
UserGroupInformation ticket)
|
|
UserGroupInformation ticket)
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
|
|
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
|
|
@@ -1065,11 +1068,11 @@ public class Client {
|
|
* timeout, returning the value.
|
|
* timeout, returning the value.
|
|
* Throws exceptions if there are network problems or if the remote code
|
|
* Throws exceptions if there are network problems or if the remote code
|
|
* threw an exception.
|
|
* threw an exception.
|
|
- * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
|
|
|
|
|
|
+ * @deprecated Use {@link #call(RPC.RpcKind, Writable,
|
|
* ConnectionId)} instead
|
|
* ConnectionId)} instead
|
|
*/
|
|
*/
|
|
@Deprecated
|
|
@Deprecated
|
|
- public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
|
|
|
|
+ public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
Class<?> protocol, UserGroupInformation ticket,
|
|
Class<?> protocol, UserGroupInformation ticket,
|
|
int rpcTimeout)
|
|
int rpcTimeout)
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
@@ -1080,7 +1083,7 @@ public class Client {
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress,
|
|
|
|
|
|
+ * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
|
|
* Class, UserGroupInformation, int, Configuration)}
|
|
* Class, UserGroupInformation, int, Configuration)}
|
|
* except that rpcKind is writable.
|
|
* except that rpcKind is writable.
|
|
*/
|
|
*/
|
|
@@ -1090,7 +1093,7 @@ public class Client {
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
|
|
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
|
|
ticket, rpcTimeout, conf);
|
|
ticket, rpcTimeout, conf);
|
|
- return call(RpcKind.RPC_BUILTIN, param, remoteId);
|
|
|
|
|
|
+ return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1101,7 +1104,7 @@ public class Client {
|
|
* value. Throws exceptions if there are network problems or if the remote
|
|
* value. Throws exceptions if there are network problems or if the remote
|
|
* code threw an exception.
|
|
* code threw an exception.
|
|
*/
|
|
*/
|
|
- public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
|
|
|
|
+ public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
Class<?> protocol, UserGroupInformation ticket,
|
|
Class<?> protocol, UserGroupInformation ticket,
|
|
int rpcTimeout, Configuration conf)
|
|
int rpcTimeout, Configuration conf)
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
@@ -1111,12 +1114,12 @@ public class Client {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
|
|
|
|
|
|
+ * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
|
|
* except the rpcKind is RPC_BUILTIN
|
|
* except the rpcKind is RPC_BUILTIN
|
|
*/
|
|
*/
|
|
public Writable call(Writable param, ConnectionId remoteId)
|
|
public Writable call(Writable param, ConnectionId remoteId)
|
|
throws InterruptedException, IOException {
|
|
throws InterruptedException, IOException {
|
|
- return call(RpcKind.RPC_BUILTIN, param, remoteId);
|
|
|
|
|
|
+ return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1130,7 +1133,7 @@ public class Client {
|
|
* Throws exceptions if there are network problems or if the remote code
|
|
* Throws exceptions if there are network problems or if the remote code
|
|
* threw an exception.
|
|
* threw an exception.
|
|
*/
|
|
*/
|
|
- public Writable call(RpcKind rpcKind, Writable rpcRequest,
|
|
|
|
|
|
+ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
|
|
ConnectionId remoteId) throws InterruptedException, IOException {
|
|
ConnectionId remoteId) throws InterruptedException, IOException {
|
|
Call call = new Call(rpcKind, rpcRequest);
|
|
Call call = new Call(rpcKind, rpcRequest);
|
|
Connection connection = getConnection(remoteId, call);
|
|
Connection connection = getConnection(remoteId, call);
|