Kaynağa Gözat

Merge -r 720601:720602 from trunk to move the change log of HADOOP-4659 to branch 0.19.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@720608 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 yıl önce
ebeveyn
işleme
ca5fddcc5b

+ 3 - 0
CHANGES.txt

@@ -1023,6 +1023,9 @@ Release 0.18.3 - Unreleased
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
     (szetszwo)
     (szetszwo)
 
 
+    HADOOP-4659. Root cause of connection failure is being ost to code that
+    uses it for delaying startup. (Steve Loughran and Hairong via hairong)
+
 Release 0.18.2 - 2008-11-03
 Release 0.18.2 - 2008-11-03
 
 
   BUG FIXES
   BUG FIXES

+ 34 - 5
src/core/org/apache/hadoop/ipc/Client.java

@@ -22,6 +22,7 @@ import java.net.Socket;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
+import java.net.ConnectException;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.DataInputStream;
 import java.io.DataInputStream;
@@ -352,7 +353,7 @@ public class Client {
       socket = null;
       socket = null;
 
 
       // throw the exception if the maximum number of retries is reached
       // throw the exception if the maximum number of retries is reached
-      if (curRetries == maxRetries) {
+      if (curRetries >= maxRetries) {
         throw ioe;
         throw ioe;
       }
       }
 
 
@@ -696,10 +697,7 @@ public class Client {
           call.error.fillInStackTrace();
           call.error.fillInStackTrace();
           throw call.error;
           throw call.error;
         } else { // local exception
         } else { // local exception
-          throw (IOException)new IOException(
-              "Call to "+ addr + " failed on local exception: "
-                      + call.error.getMessage())
-                  .initCause(call.error);
+          throw wrapException(addr, call.error);
         }
         }
       } else {
       } else {
         return call.value;
         return call.value;
@@ -707,6 +705,37 @@ public class Client {
     }
     }
   }
   }
 
 
+  /**
+   * Take an IOException and the address we were trying to connect to
+   * and return an IOException with the input exception as the cause.
+   * The new exception provides the stack trace of the place where 
+   * the exception is thrown and some extra diagnostics information.
+   * If the exception is ConnectException or SocketTimeoutException, 
+   * return a new one of the same type; Otherwise return an IOException.
+   * 
+   * @param addr target address
+   * @param exception the relevant exception
+   * @return an exception to throw
+   */
+  private IOException wrapException(InetSocketAddress addr,
+                                         IOException exception) {
+    if (exception instanceof ConnectException) {
+      //connection refused; include the host:port in the error
+      return (ConnectException)new ConnectException(
+           "Call to " + addr + " failed on connection exception: " + exception)
+                    .initCause(exception);
+    } else if (exception instanceof SocketTimeoutException) {
+      return (SocketTimeoutException)new SocketTimeoutException(
+           "Call to " + addr + " failed on socket timeout exception: "
+                      + exception).initCause(exception);
+    } else {
+      return (IOException)new IOException(
+           "Call to " + addr + " failed on local exception: " + exception)
+                                 .initCause(exception);
+
+    }
+  }
+
   /** Makes a set of calls in parallel.  Each parameter is sent to the
   /** Makes a set of calls in parallel.  Each parameter is sent to the
    * corresponding address.  When all values are available, or have timed out
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
    * or errored, the collected results are returned in an array.  The array

+ 31 - 2
src/core/org/apache/hadoop/ipc/RPC.java

@@ -279,18 +279,47 @@ public class RPC {
   }
   }
   
   
   public static VersionedProtocol waitForProxy(Class protocol,
   public static VersionedProtocol waitForProxy(Class protocol,
+      long clientVersion,
+      InetSocketAddress addr,
+      Configuration conf
+      ) throws IOException {
+    return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
+  }
+
+  /**
+   * Get a proxy connection to a remote server
+   * @param protocol protocol class
+   * @param clientVersion client version
+   * @param addr remote address
+   * @param conf configuration to use
+   * @param timeout time in milliseconds before giving up
+   * @return the proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  static VersionedProtocol waitForProxy(Class protocol,
                                                long clientVersion,
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                InetSocketAddress addr,
-                                               Configuration conf
-                                               ) throws IOException {
+                                               Configuration conf,
+                                               long timeout
+                                               ) throws IOException { 
+    long startTime = System.currentTimeMillis();
+    IOException ioe;
     while (true) {
     while (true) {
       try {
       try {
         return getProxy(protocol, clientVersion, addr, conf);
         return getProxy(protocol, clientVersion, addr, conf);
       } catch(ConnectException se) {  // namenode has not been started
       } catch(ConnectException se) {  // namenode has not been started
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+        ioe = se;
       } catch(SocketTimeoutException te) {  // namenode is busy
       } catch(SocketTimeoutException te) {  // namenode is busy
         LOG.info("Problem connecting to server: " + addr);
         LOG.info("Problem connecting to server: " + addr);
+        ioe = te;
       }
       }
+      // check if timed out
+      if (System.currentTimeMillis()-timeout >= startTime) {
+        throw ioe;
+      }
+
+      // wait for retry
       try {
       try {
         Thread.sleep(1000);
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
       } catch (InterruptedException ie) {

+ 12 - 2
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.ipc;
 package org.apache.hadoop.ipc;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
 
 
@@ -32,7 +33,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 
 
-import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 
 
 /** Unit tests for RPC. */
 /** Unit tests for RPC. */
@@ -117,7 +117,6 @@ public class TestRPC extends TestCase {
     }
     }
 
 
     public int[] exchange(int[] values) {
     public int[] exchange(int[] values) {
-      int sum = 0;
       for (int i = 0; i < values.length; i++) {
       for (int i = 0; i < values.length; i++) {
         values[i] = i;
         values[i] = i;
       }
       }
@@ -309,6 +308,17 @@ public class TestRPC extends TestCase {
       if(proxy!=null) RPC.stopProxy(proxy);
       if(proxy!=null) RPC.stopProxy(proxy);
     }
     }
   }
   }
+  
+  public void testStandaloneClient() throws IOException {
+    try {
+      RPC.waitForProxy(TestProtocol.class,
+        TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
+      fail("We should not have reached here");
+    } catch (ConnectException ioe) {
+      //this is what we expected
+    }
+  }
+  
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
 
 
     new TestRPC("test").testCalls();
     new TestRPC("test").testCalls();