Browse Source

svn merge -c 1171221 from trunk for HADOOP-7635.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1233861 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
45131b31c3

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -17,6 +17,9 @@ Release 0.23-PB - Unreleased
 
 
     HADOOP-7607. Simplify the RPC proxy cleanup process. (atm)
     HADOOP-7607. Simplify the RPC proxy cleanup process. (atm)
 
 
+    HADOOP-7635. RetryInvocationHandler should release underlying resources on
+    close (atm)
+
     HADOOP-7687 Make getProtocolSignature public  (sanjay)
     HADOOP-7687 Make getProtocolSignature public  (sanjay)
 
 
     HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol
     HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java

@@ -17,7 +17,10 @@
  */
  */
 package org.apache.hadoop.io.retry;
 package org.apache.hadoop.io.retry;
 
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.RPC;
 
 
 /**
 /**
  * An implementation of {@link FailoverProxyProvider} which does nothing in the
  * An implementation of {@link FailoverProxyProvider} which does nothing in the
@@ -49,4 +52,9 @@ public class DefaultFailoverProxyProvider implements FailoverProxyProvider {
     // Nothing to do.
     // Nothing to do.
   }
   }
 
 
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(proxy);
+  }
+
 }
 }

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java

@@ -17,6 +17,8 @@
  */
  */
 package org.apache.hadoop.io.retry;
 package org.apache.hadoop.io.retry;
 
 
+import java.io.Closeable;
+
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 
 
 /**
 /**
@@ -27,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  * {@link RetryPolicy}.
  * {@link RetryPolicy}.
  */
  */
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
-public interface FailoverProxyProvider {
+public interface FailoverProxyProvider extends Closeable {
 
 
   /**
   /**
    * Get the proxy object which should be used until the next failover event
    * Get the proxy object which should be used until the next failover event

+ 8 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -17,6 +17,8 @@
  */
  */
 package org.apache.hadoop.io.retry;
 package org.apache.hadoop.io.retry;
 
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
@@ -27,7 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 
 
-class RetryInvocationHandler implements InvocationHandler {
+class RetryInvocationHandler implements InvocationHandler, Closeable {
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
   public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
   private FailoverProxyProvider proxyProvider;
   private FailoverProxyProvider proxyProvider;
   
   
@@ -103,4 +105,9 @@ class RetryInvocationHandler implements InvocationHandler {
     }
     }
   }
   }
 
 
+  @Override
+  public void close() throws IOException {
+    proxyProvider.close();
+  }
+
 }
 }

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java

@@ -57,6 +57,11 @@ public class TestFailoverProxy {
     public Class<?> getInterface() {
     public Class<?> getInterface() {
       return iface;
       return iface;
     }
     }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to do.
+    }
     
     
   }
   }
   
   

+ 126 - 13
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -18,28 +18,38 @@
 
 
 package org.apache.hadoop.ipc;
 package org.apache.hadoop.ipc;
 
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
-import junit.framework.TestCase;
+import javax.net.SocketFactory;
 
 
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.authorize.Service;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DescriptorProtos;
 import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
 import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
@@ -49,18 +59,22 @@ import static org.apache.hadoop.test.MetricsAsserts.*;
 import static org.mockito.Mockito.*;
 import static org.mockito.Mockito.*;
 
 
 /** Unit tests for RPC. */
 /** Unit tests for RPC. */
-public class TestRPC extends TestCase {
+@SuppressWarnings("deprecation")
+public class TestRPC {
   private static final String ADDRESS = "0.0.0.0";
   private static final String ADDRESS = "0.0.0.0";
 
 
   public static final Log LOG =
   public static final Log LOG =
     LogFactory.getLog(TestRPC.class);
     LogFactory.getLog(TestRPC.class);
   
   
   private static Configuration conf = new Configuration();
   private static Configuration conf = new Configuration();
+  
+  static {
+    conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
+        StoppedRpcEngine.class, RpcEngine.class);
+  }
 
 
   int datasize = 1024*100;
   int datasize = 1024*100;
   int numThreads = 50;
   int numThreads = 50;
-
-  public TestRPC(String name) { super(name); }
 	
 	
   public interface TestProtocol extends VersionedProtocol {
   public interface TestProtocol extends VersionedProtocol {
     public static final long versionID = 1L;
     public static final long versionID = 1L;
@@ -207,6 +221,74 @@ public class TestRPC extends TestCase {
     }
     }
   }
   }
   
   
+  /**
+   * A basic interface for testing client-side RPC resource cleanup.
+   */
+  private static interface StoppedProtocol {
+    long versionID = 0;
+
+    public void stop();
+  }
+  
+  /**
+   * A class used for testing cleanup of client side RPC resources.
+   */
+  private static class StoppedRpcEngine implements RpcEngine {
+
+    @Override
+    public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
+        UserGroupInformation ticket, Configuration conf)
+        throws IOException, InterruptedException {
+      return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+        SocketFactory factory, int rpcTimeout) throws IOException {
+      T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+              new Class[] { protocol }, new StoppedInvocationHandler());
+      return new ProtocolProxy<T>(protocol, proxy, false);
+    }
+
+    @Override
+    public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
+        Object instance, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+      return null;
+    }
+    
+  }
+
+  /**
+   * An invocation handler which does nothing when invoking methods, and just
+   * counts the number of times close() is called.
+   */
+  private static class StoppedInvocationHandler
+      implements InvocationHandler, Closeable {
+    
+    private int closeCalled = 0;
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+          return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeCalled++;
+    }
+    
+    public int getCloseCalled() {
+      return closeCalled;
+    }
+    
+  }
+  
+  @Test
   public void testConfRpc() throws Exception {
   public void testConfRpc() throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, 1, false, conf, null);
                                   new TestImpl(), ADDRESS, 0, 1, false, conf, null);
@@ -229,6 +311,7 @@ public class TestRPC extends TestCase {
     server.stop();    
     server.stop();    
   }
   }
 
 
+  @Test
   public void testSlowRpc() throws Exception {
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
     System.out.println("Testing Slow RPC");
     // create a server with two handlers
     // create a server with two handlers
@@ -273,11 +356,12 @@ public class TestRPC extends TestCase {
     }
     }
   }
   }
   
   
-  public void testRPCConf(Configuration conf) throws Exception {
-    
+  @Test
+  public void testCalls() throws Exception {
+    testCallsInternal(conf);
   }
   }
-
-  public void testCalls(Configuration conf) throws Exception {
+  
+  private void testCallsInternal(Configuration conf) throws Exception {
     Server server = RPC.getServer(TestProtocol.class,
     Server server = RPC.getServer(TestProtocol.class,
                                   new TestImpl(), ADDRESS, 0, conf);
                                   new TestImpl(), ADDRESS, 0, conf);
     TestProtocol proxy = null;
     TestProtocol proxy = null;
@@ -384,6 +468,7 @@ public class TestRPC extends TestCase {
     }
     }
   }
   }
   
   
+  @Test
   public void testStandaloneClient() throws IOException {
   public void testStandaloneClient() throws IOException {
     try {
     try {
       TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
       TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
@@ -450,6 +535,7 @@ public class TestRPC extends TestCase {
     }
     }
   }
   }
   
   
+  @Test
   public void testAuthorization() throws Exception {
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
     conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@@ -481,20 +567,48 @@ public class TestRPC extends TestCase {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     
     
     conf.setBoolean("ipc.client.ping", false);
     conf.setBoolean("ipc.client.ping", false);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
     
     
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
-    new TestRPC("testnoPings").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
   }
   }
 
 
   /**
   /**
    * Test stopping a non-registered proxy
    * Test stopping a non-registered proxy
    * @throws Exception
    * @throws Exception
    */
    */
+  @Test
   public void testStopNonRegisteredProxy() throws Exception {
   public void testStopNonRegisteredProxy() throws Exception {
     RPC.stopProxy(mock(TestProtocol.class));
     RPC.stopProxy(mock(TestProtocol.class));
   }
   }
   
   
+  @Test
+  public void testStopProxy() throws IOException {
+    StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
+  public void testWrappedStopProxy() throws IOException {
+    StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
+        StoppedProtocol.versionID, null, conf);
+    StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
+        Proxy.getInvocationHandler(wrappedProxy);
+    
+    StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class,
+        wrappedProxy, RetryPolicies.RETRY_FOREVER);
+    
+    assertEquals(invocationHandler.getCloseCalled(), 0);
+    RPC.stopProxy(proxy);
+    assertEquals(invocationHandler.getCloseCalled(), 1);
+  }
+  
+  @Test
   public void testErrorMsgForInsecureClient() throws Exception {
   public void testErrorMsgForInsecureClient() throws Exception {
     final Server server = RPC.getServer(TestProtocol.class,
     final Server server = RPC.getServer(TestProtocol.class,
         new TestImpl(), ADDRESS, 0, 5, true, conf, null);
         new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -567,10 +681,10 @@ public class TestRPC extends TestCase {
     return count;
     return count;
   }
   }
 
 
-
   /**
   /**
    * Test that server.stop() properly stops all threads
    * Test that server.stop() properly stops all threads
    */
    */
+  @Test
   public void testStopsAllThreads() throws Exception {
   public void testStopsAllThreads() throws Exception {
     int threadsBefore = countThreads("Server$Listener$Reader");
     int threadsBefore = countThreads("Server$Listener$Reader");
     assertEquals("Expect no Reader threads running before test",
     assertEquals("Expect no Reader threads running before test",
@@ -591,8 +705,7 @@ public class TestRPC extends TestCase {
   }
   }
   
   
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
-
-    new TestRPC("test").testCalls(conf);
+    new TestRPC().testCallsInternal(conf);
 
 
   }
   }
 }
 }