Browse Source

Revert "HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)"

This reverts commit acafc950d9347769c3729d571121b3525c6d5eb2.
Steve Loughran 9 years ago
parent
commit
b409ce89e9

+ 0 - 2
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -902,8 +902,6 @@ Release 2.6.5 - UNRELEASED
     HADOOP-12789. log classpath of ApplicationClassLoader at INFO level
     HADOOP-12789. log classpath of ApplicationClassLoader at INFO level
     (Sangjin Lee via mingma)
     (Sangjin Lee via mingma)
 
 
-    HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)
-
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 15 - 20
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -383,16 +383,15 @@ public class Client {
     private Socket socket = null;                 // connected socket
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
     private DataInputStream in;
     private DataOutputStream out;
     private DataOutputStream out;
-    private final int rpcTimeout;
+    private int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for 
     private int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
     //maxIdleTime msecs
     private final RetryPolicy connectionRetryPolicy;
     private final RetryPolicy connectionRetryPolicy;
     private final int maxRetriesOnSasl;
     private final int maxRetriesOnSasl;
     private int maxRetriesOnSocketTimeouts;
     private int maxRetriesOnSocketTimeouts;
-    private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
-    private final boolean doPing; //do we need to send ping message
-    private final int pingInterval; // how often sends ping to the server
-    private final int soTimeout; // used by ipc ping and rpc timeout
+    private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+    private boolean doPing; //do we need to send ping message
+    private int pingInterval; // how often sends ping to the server in msecs
     private ByteArrayOutputStream pingRequest; // ping message
     private ByteArrayOutputStream pingRequest; // ping message
     
     
     // currently active calls
     // currently active calls
@@ -430,9 +429,6 @@ public class Client {
         pingHeader.writeDelimitedTo(pingRequest);
         pingHeader.writeDelimitedTo(pingRequest);
       }
       }
       this.pingInterval = remoteId.getPingInterval();
       this.pingInterval = remoteId.getPingInterval();
-      this.soTimeout =
-          (rpcTimeout == 0 || (doPing && pingInterval < rpcTimeout))?
-              this.pingInterval : this.rpcTimeout;
       this.serviceClass = serviceClass;
       this.serviceClass = serviceClass;
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The ping interval is " + this.pingInterval + " ms.");
         LOG.debug("The ping interval is " + this.pingInterval + " ms.");
@@ -483,12 +479,12 @@ public class Client {
 
 
       /* Process timeout exception
       /* Process timeout exception
        * if the connection is not going to be closed or 
        * if the connection is not going to be closed or 
-       * the RPC is not timed out yet, send a ping.
+       * is not configured to have a RPC timeout, send a ping.
+       * (if rpcTimeout is not set to be 0, then RPC should timeout.
+       * otherwise, throw the timeout exception.
        */
        */
-      private void handleTimeout(SocketTimeoutException e, int waiting)
-          throws IOException {
-        if (shouldCloseConnection.get() || !running.get() ||
-            (0 < rpcTimeout && rpcTimeout <= waiting)) {
+      private void handleTimeout(SocketTimeoutException e) throws IOException {
+        if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
           throw e;
           throw e;
         } else {
         } else {
           sendPing();
           sendPing();
@@ -502,13 +498,11 @@ public class Client {
        */
        */
       @Override
       @Override
       public int read() throws IOException {
       public int read() throws IOException {
-        int waiting = 0;
         do {
         do {
           try {
           try {
             return super.read();
             return super.read();
           } catch (SocketTimeoutException e) {
           } catch (SocketTimeoutException e) {
-            waiting += soTimeout;
-            handleTimeout(e, waiting);
+            handleTimeout(e);
           }
           }
         } while (true);
         } while (true);
       }
       }
@@ -521,13 +515,11 @@ public class Client {
        */
        */
       @Override
       @Override
       public int read(byte[] buf, int off, int len) throws IOException {
       public int read(byte[] buf, int off, int len) throws IOException {
-        int waiting = 0;
         do {
         do {
           try {
           try {
             return super.read(buf, off, len);
             return super.read(buf, off, len);
           } catch (SocketTimeoutException e) {
           } catch (SocketTimeoutException e) {
-            waiting += soTimeout;
-            handleTimeout(e, waiting);
+            handleTimeout(e);
           }
           }
         } while (true);
         } while (true);
       }
       }
@@ -620,7 +612,10 @@ public class Client {
           }
           }
           
           
           NetUtils.connect(this.socket, server, connectionTimeout);
           NetUtils.connect(this.socket, server, connectionTimeout);
-          this.socket.setSoTimeout(soTimeout);
+          if (rpcTimeout > 0) {
+            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
+          }
+          this.socket.setSoTimeout(pingInterval);
           return;
           return;
         } catch (ConnectTimeoutException toe) {
         } catch (ConnectTimeoutException toe) {
           /* Check for an address change and update the local reference.
           /* Check for an address change and update the local reference.

+ 5 - 4
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -980,7 +980,7 @@ for ldap providers in the same way as above does.
   <value>true</value>
   <value>true</value>
   <description>Send a ping to the server when timeout on reading the response,
   <description>Send a ping to the server when timeout on reading the response,
   if set to true. If no failure is detected, the client retries until at least
   if set to true. If no failure is detected, the client retries until at least
-  a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
+  a byte is read.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -997,9 +997,10 @@ for ldap providers in the same way as above does.
   <name>ipc.client.rpc-timeout.ms</name>
   <name>ipc.client.rpc-timeout.ms</name>
   <value>0</value>
   <value>0</value>
   <description>Timeout on waiting response from server, in milliseconds.
   <description>Timeout on waiting response from server, in milliseconds.
-  If ipc.client.ping is set to true and this rpc-timeout is greater than
-  the value of ipc.ping.interval, the effective value of the rpc-timeout is
-  rounded up to multiple of ipc.ping.interval.
+  Currently this timeout works only when ipc.client.ping is set to true
+  because it uses the same facilities with IPC ping.
+  The timeout overrides the ipc.ping.interval and client will throw exception
+  instead of sending ping when the interval is passed.
   </description>
   </description>
 </property>
 </property>
 
 

+ 0 - 68
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -1107,74 +1107,6 @@ public class TestRPC {
     }
     }
   }
   }
 
 
-  /**
-   *  Test RPC timeout when ipc.client.ping is false.
-   */
-  @Test(timeout=30000)
-  public void testClientRpcTimeoutWithoutPing() throws Exception {
-    final Server server = new RPC.Builder(conf)
-        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
-        .setBindAddress(ADDRESS).setPort(0)
-        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
-        .build();
-    server.start();
-
-    final Configuration conf = new Configuration();
-    conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
-    final TestProtocol proxy =
-        RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
-            NetUtils.getConnectAddress(server), conf);
-
-    try {
-      proxy.sleep(3000);
-      fail("RPC should time out.");
-    } catch (SocketTimeoutException e) {
-      LOG.info("got expected timeout.", e);
-    } finally {
-      server.stop();
-      RPC.stopProxy(proxy);
-    }
-  }
-
-  /**
-   *  Test RPC timeout greater than ipc.ping.interval.
-   */
-  @Test(timeout=30000)
-  public void testClientRpcTimeoutGreaterThanPingInterval() throws Exception {
-    final Server server = new RPC.Builder(conf)
-        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
-        .setBindAddress(ADDRESS).setPort(0)
-        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
-        .build();
-    server.start();
-
-    final Configuration conf = new Configuration();
-    conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
-    conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
-    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
-    final TestProtocol proxy =
-        RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
-            NetUtils.getConnectAddress(server), conf);
-
-    // should not time out.
-    proxy.sleep(300);
-
-    // should not time out because effective rpc-timeout is
-    // multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
-    proxy.sleep(1300);
-
-    try {
-      proxy.sleep(2000);
-      fail("RPC should time out.");
-    } catch (SocketTimeoutException e) {
-      LOG.info("got expected timeout.", e);
-    } finally {
-      server.stop();
-      RPC.stopProxy(proxy);
-    }
-  }
-
   public static void main(String[] args) throws IOException {
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);
     new TestRPC().testCallsInternal(conf);