|
@@ -384,7 +384,8 @@ public class Client {
|
|
|
private final RetryPolicy connectionRetryPolicy;
|
|
|
private final int maxRetriesOnSasl;
|
|
|
private int maxRetriesOnSocketTimeouts;
|
|
|
- private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
+ private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
+ private final boolean tcpLowLatency; // if T then use low-delay QoS
|
|
|
private boolean doPing; //do we need to send ping message
|
|
|
private int pingInterval; // how often sends ping to the server in msecs
|
|
|
private ByteArrayOutputStream pingRequest; // ping message
|
|
@@ -413,6 +414,7 @@ public class Client {
|
|
|
this.maxRetriesOnSasl = remoteId.getMaxRetriesOnSasl();
|
|
|
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
|
|
|
this.tcpNoDelay = remoteId.getTcpNoDelay();
|
|
|
+ this.tcpLowLatency = remoteId.getTcpLowLatency();
|
|
|
this.doPing = remoteId.getDoPing();
|
|
|
if (doPing) {
|
|
|
// construct a RPC header with the callId as the ping callId
|
|
@@ -585,6 +587,20 @@ public class Client {
|
|
|
this.socket.setTcpNoDelay(tcpNoDelay);
|
|
|
this.socket.setKeepAlive(true);
|
|
|
|
|
|
+ if (tcpLowLatency) {
|
|
|
+ /*
|
|
|
+ * This allows intermediate switches to shape IPC traffic
|
|
|
+ * differently from Shuffle/HDFS DataStreamer traffic.
|
|
|
+ *
|
|
|
+ * IPTOS_RELIABILITY (0x04) | IPTOS_LOWDELAY (0x10)
|
|
|
+ *
|
|
|
+ * Prefer to optimize connect() speed & response latency over net
|
|
|
+ * throughput.
|
|
|
+ */
|
|
|
+ this.socket.setTrafficClass(0x04 | 0x10);
|
|
|
+ this.socket.setPerformancePreferences(1, 2, 0);
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Bind the socket to the host specified in the principal name of the
|
|
|
* client, to ensure Server matching address of the client connection
|
|
@@ -1549,6 +1565,7 @@ public class Client {
|
|
|
// the max. no. of retries for socket connections on time out exceptions
|
|
|
private final int maxRetriesOnSocketTimeouts;
|
|
|
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
|
+ private final boolean tcpLowLatency; // if T then use low-delay QoS
|
|
|
private final boolean doPing; //do we need to send ping message
|
|
|
private final int pingInterval; // how often sends ping to the server in msecs
|
|
|
private String saslQop; // here for testing
|
|
@@ -1575,6 +1592,10 @@ public class Client {
|
|
|
this.tcpNoDelay = conf.getBoolean(
|
|
|
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
|
|
|
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT);
|
|
|
+ this.tcpLowLatency = conf.getBoolean(
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_LOW_LATENCY,
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_LOW_LATENCY_DEFAULT
|
|
|
+ );
|
|
|
this.doPing = conf.getBoolean(
|
|
|
CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
|
|
|
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT);
|
|
@@ -1610,11 +1631,17 @@ public class Client {
|
|
|
public int getMaxRetriesOnSocketTimeouts() {
|
|
|
return maxRetriesOnSocketTimeouts;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /** disable nagle's algorithm */
|
|
|
boolean getTcpNoDelay() {
|
|
|
return tcpNoDelay;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /** use low-latency QoS bits over TCP */
|
|
|
+ boolean getTcpLowLatency() {
|
|
|
+ return tcpLowLatency;
|
|
|
+ }
|
|
|
+
|
|
|
boolean getDoPing() {
|
|
|
return doPing;
|
|
|
}
|