Przeglądaj źródła

HADOOP-9458. Fix RPC.getProxy to ensure it uses retries for getProtocolVersion too. Contributed by Tsz Wo Sze.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1471644 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 lat temu
rodzic
commit
8528de4a88

+ 3 - 0
CHANGES.txt

@@ -631,6 +631,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
     synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
 
+    HADOOP-9458. Fix RPC.getProxy to ensure it uses retries for
+    getProtocolVersion too. (szetszwo via acmurthy)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

+ 14 - 5
src/core/org/apache/hadoop/ipc/RPC.java

@@ -390,7 +390,7 @@ public class RPC {
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
       Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
     return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-        rpcTimeout, null);
+        rpcTimeout, null, true);
   }
 
   /** Construct a client-side proxy object that implements the named protocol,
@@ -399,7 +399,8 @@ public class RPC {
       Class<? extends VersionedProtocol> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
       Configuration conf, SocketFactory factory, int rpcTimeout,
-      RetryPolicy connectionRetryPolicy) throws IOException {
+      RetryPolicy connectionRetryPolicy,
+      boolean checkVersion) throws IOException {
 
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
@@ -408,11 +409,19 @@ public class RPC {
         rpcTimeout, connectionRetryPolicy);
     VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[]{protocol}, invoker);
+    
+    if (checkVersion) {
+      checkVersion(protocol, clientVersion, proxy);
+    }
+    return proxy;
+  }
+
+  /** Get server version and then compare it with client version. */
+  public static void checkVersion(Class<? extends VersionedProtocol> protocol,
+      long clientVersion, VersionedProtocol proxy)  throws IOException {
     long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                   clientVersion);
-    if (serverVersion == clientVersion) {
-      return proxy;
-    } else {
+    if (serverVersion != clientVersion) {
       throw new VersionMismatch(protocol.getName(), clientVersion, 
                                 serverVersion);
     }

+ 5 - 2
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -141,7 +141,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
                 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
                 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
                 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT
-                ));  
+                ),
+        false);  
     }
 
   private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
@@ -177,8 +178,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     
     methodNameToPolicyMap.put("create", methodPolicy);
 
-    return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
+    final ClientProtocol cp = (ClientProtocol) RetryProxy.create(ClientProtocol.class,
         rpcNamenode, defaultPolicy, methodNameToPolicyMap);
+    RPC.checkVersion(ClientProtocol.class, ClientProtocol.versionID, cp);
+    return cp;
   }
 
   /** Create {@link ClientDatanodeProtocol} proxy with block/token */

+ 7 - 3
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -515,8 +515,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
                 MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
                 MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
                 MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT
-                )
-            );
+                ),
+            false);
     
     return rpcJobSubmitClient;
   }
@@ -553,8 +553,12 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL);
     methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL);
     
-    return (JobSubmissionProtocol) RetryProxy.create(JobSubmissionProtocol.class,
+    final JobSubmissionProtocol jsp = (JobSubmissionProtocol) RetryProxy.create(
+        JobSubmissionProtocol.class,
         rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap);
+    RPC.checkVersion(JobSubmissionProtocol.class,
+        JobSubmissionProtocol.versionID, jsp);
+    return jsp;
   }
 
   @InterfaceAudience.Private

+ 49 - 8
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -18,31 +18,35 @@
 
 package org.apache.hadoop.ipc;
 
-import org.apache.hadoop.metrics2.MetricsSource;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.lang.reflect.Method;
-
-import junit.framework.TestCase;
-
 import java.util.Arrays;
 
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.metrics.RpcInstrumentation;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.AccessControlException;
-import static org.apache.hadoop.test.MetricsAsserts.*;
 
 /** Unit tests for RPC. */
 public class TestRPC extends TestCase {
@@ -133,6 +137,14 @@ public class TestRPC extends TestCase {
     }
   }
 
+  public static class TestVersionMismatchImpl extends TestImpl {
+    /** @return a different version. */
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return super.getProtocolVersion(protocol, clientVersion) + 1;
+    }
+  }
+
   //
   // an object that does a bunch of transactions
   //
@@ -480,6 +492,35 @@ public class TestRPC extends TestCase {
     }
     assertTrue(succeeded);
   }
+
+  /** Test RPC.checkVersion method. */
+  public void testCheckVersion() throws Exception {
+    Server server = RPC.getServer(new TestVersionMismatchImpl(), ADDRESS, 0, conf);
+    TestProtocol proxy = null;
+    try {
+      server.start();
+    
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      
+      // get proxy should succeed
+      proxy = (TestProtocol)RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, ugi, conf,
+          NetUtils.getSocketFactory(conf, TestProtocol.class), 0, null, false);  
+
+      try {
+        RPC.checkVersion(TestProtocol.class, TestProtocol.versionID, proxy);
+        fail("Check version should throw VersionMismatch");
+      } catch(VersionMismatch vm) {
+        LOG.info("The VersionMismatch is expected", vm);
+      }
+    } finally {
+      server.stop();
+      if (proxy!=null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
  
   public static void main(String[] args) throws Exception {