Browse Source

HDFS-5118. Merge change r1520639 from branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1520658 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 năm trước cách đây
mục cha
commit
2f680636b4

+ 62 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A dummy invocation handler extending RetryInvocationHandler. It drops the
+ * first N number of responses. This invocation handler is only used for testing.
+ */
+@InterfaceAudience.Private
+public class LossyRetryInvocationHandler<T> extends RetryInvocationHandler<T> {
+  private final int numToDrop;
+  private static final ThreadLocal<Integer> RetryCount = 
+      new ThreadLocal<Integer>();
+
+  public LossyRetryInvocationHandler(int numToDrop,
+      FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
+    super(proxyProvider, retryPolicy);
+    this.numToDrop = numToDrop;
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args)
+      throws Throwable {
+    RetryCount.set(0);
+    return super.invoke(proxy, method, args);
+  }
+
+  @Override
+  protected Object invokeMethod(Method method, Object[] args) throws Throwable {
+    Object result = super.invokeMethod(method, args);
+    int retryCount = RetryCount.get();
+    if (retryCount < this.numToDrop) {
+      RetryCount.set(++retryCount);
+      LOG.info("Drop the response. Current retryCount == " + retryCount);
+      throw new UnknownHostException("Fake Exception");
+    } else {
+      LOG.info("retryCount == " + retryCount
+          + ". It's time to normally process the response");
+      return result;
+    }
+  }
+}

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java

@@ -64,7 +64,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
   }
 
-  RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
+  protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
       RetryPolicy defaultPolicy,
       Map<String, RetryPolicy> methodNameToPolicyMap) {
     this.proxyProvider = proxyProvider;

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -35,6 +35,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5136 MNT EXPORT should give the full group list which can mount the
     exports (brandonli)
 
+    HDFS-5118. Provide testing support for DFSClient to drop RPC responses.
+    (jing9)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may

+ 23 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -99,6 +99,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -112,13 +113,13 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -142,6 +143,7 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -447,7 +449,11 @@ public class DFSClient implements java.io.Closeable {
   
   /** 
    * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
-   * Exactly one of nameNodeUri or rpcNamenode must be null.
+   * If HA is enabled and a positive value is set for 
+   * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
+   * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
+   * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode 
+   * must be null.
    */
   @VisibleForTesting
   public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
@@ -471,7 +477,20 @@ public class DFSClient implements java.io.Closeable {
     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
         DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
     
-    if (rpcNamenode != null) {
+    int numResponseToDrop = conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
+        DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
+    if (numResponseToDrop > 0) {
+      // This case is used for testing.
+      LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+          + " is set to " + numResponseToDrop
+          + ", this hacked client will proactively drop responses");
+      NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies
+          .createProxyWithLossyRetryHandler(conf, nameNodeUri,
+              ClientProtocol.class, numResponseToDrop);
+      this.dtService = proxyInfo.getDelegationTokenService();
+      this.namenode = proxyInfo.getProxy();
+    } else if (rpcNamenode != null) {
       // This case is used for testing.
       Preconditions.checkArgument(nameNodeUri == null);
       this.namenode = rpcNamenode;
@@ -500,7 +519,7 @@ public class DFSClient implements java.io.Closeable {
     
     this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
-
+  
   /**
    * Return the socket addresses to use with each configured
    * local interface. Local interfaces may be specified by IP

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -491,6 +491,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
   public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
   public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
+  
+  // The number of NN response dropped by client proactively in each RPC call.
+  // For testing NN retry cache, we can set this property with positive value.
+  public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number";
+  public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
+
 
   // Hidden configuration undocumented in hdfs-site. xml
   // Timeout to wait for block receiver and responder thread to stop

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java

@@ -17,10 +17,18 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashMap;
@@ -48,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -144,6 +153,61 @@ public class NameNodeProxies {
       return new ProxyAndInfo<T>(proxy, dtService);
     }
   }
+  
+  /**
+   * Generate a dummy namenode proxy instance that utilizes our hacked
+   * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
+   * method will proactively drop RPC responses. Currently this method only
+   * support HA setup. IllegalStateException will be thrown if the given
+   * configuration is not for HA.
+   * 
+   * @param config the configuration containing the required IPC
+   *        properties, client failover configurations, etc.
+   * @param nameNodeUri the URI pointing either to a specific NameNode
+   *        or to a logical nameservice.
+   * @param xface the IPC interface which should be created
+   * @param numResponseToDrop The number of responses to drop for each RPC call
+   * @return an object containing both the proxy and the associated
+   *         delegation token service it corresponds to
+   * @throws IOException if there is an error creating the proxy
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
+      Configuration config, URI nameNodeUri, Class<T> xface,
+      int numResponseToDrop) throws IOException {
+    Preconditions.checkArgument(numResponseToDrop > 0);
+    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = 
+        getFailoverProxyProviderClass(config, nameNodeUri, xface);
+    if (failoverProxyProviderClass != null) { // HA case
+      FailoverProxyProvider<T> failoverProxyProvider = 
+          createFailoverProxyProvider(config, failoverProxyProviderClass, 
+              xface, nameNodeUri);
+      int delay = config.getInt(
+          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
+          DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
+      int maxCap = config.getInt(
+          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
+          DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
+      int maxFailoverAttempts = config.getInt(
+          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+          DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+      InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
+              numResponseToDrop, failoverProxyProvider,
+              RetryPolicies.failoverOnNetworkException(
+                  RetryPolicies.TRY_ONCE_THEN_FAIL, 
+                  Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay, 
+                  maxCap));
+      
+      T proxy = (T) Proxy.newProxyInstance(
+          failoverProxyProvider.getInterface().getClassLoader(),
+          new Class[] { xface }, dummyHandler);
+      Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
+      return new ProxyAndInfo<T>(proxy, dtService);
+    } else {
+      throw new IllegalStateException("Currently creating proxy using " +
+      		"LossyRetryInvocationHandler requires NN HA setup");
+    }
+  }
 
   /**
    * Creates an explicitly non-HA-enabled proxy object. Most of the time you