Browse Source

HDFS-17030. Limit wait time for getHAServiceState in ObserverReaderProxy (#5700)

Xing Lin 1 năm trước cách đây
mục cha
commit
ab47a658bd

+ 96 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java

@@ -24,7 +24,15 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -88,6 +96,17 @@ public class ObserverReadProxyProvider<T>
   /** Observer probe retry period default to 10 min. */
   static final long OBSERVER_PROBE_RETRY_PERIOD_DEFAULT = 60 * 10 * 1000;
 
+  /**
+   * Timeout in ms to cancel the ha-state probe rpc request for an namenode.
+   * To disable timeout, set it to 0 or a negative value.
+   */
+  static final String NAMENODE_HA_STATE_PROBE_TIMEOUT =
+      HdfsClientConfigKeys.Failover.PREFIX + "namenode.ha-state.probe.timeout";
+  /**
+   * Default to disable namenode ha-state probe timeout.
+   */
+  static final long NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT = 0;
+
   /** The inner proxy provider used for active/standby failover. */
   private final AbstractNNFailoverProxyProvider<T> failoverProxy;
   /** List of all NameNode proxies. */
@@ -155,12 +174,32 @@ public class ObserverReadProxyProvider<T>
    */
   private long observerProbeRetryPeriodMs;
 
+  /**
+   * Timeout in ms when we try to get the HA state of a namenode.
+   */
+  private long namenodeHAStateProbeTimeoutMs;
+
   /**
    * The previous time where zero observer were found. If there was observer,
    * or it is initialization, this is set to 0.
    */
   private long lastObserverProbeTime;
 
+  /**
+   * Threadpool to send the getHAServiceState requests.
+   *
+   * One thread running all the time, with up to 4 threads. Idle threads will be killed after
+   * 1 minute. At most 1024 requests can be submitted before they start to be rejected.
+   *
+   * Each hdfs client will have its own ObserverReadProxyProvider. Thus,
+   * having 1 thread running should be sufficient in most cases.
+   * We are not expecting to receive a lot of outstanding RPC calls
+   * from a single hdfs client, thus setting the queue size to 1024.
+   */
+  private final ExecutorService nnProbingThreadPool =
+      new ThreadPoolExecutor(1, 4, 1L, TimeUnit.MINUTES,
+          new ArrayBlockingQueue<Runnable>(1024));
+
   /**
    * By default ObserverReadProxyProvider uses
    * {@link ConfiguredFailoverProxyProvider} for failover.
@@ -213,6 +252,8 @@ public class ObserverReadProxyProvider<T>
     observerProbeRetryPeriodMs = conf.getTimeDuration(
         OBSERVER_PROBE_RETRY_PERIOD_KEY,
         OBSERVER_PROBE_RETRY_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+    namenodeHAStateProbeTimeoutMs = conf.getTimeDuration(NAMENODE_HA_STATE_PROBE_TIMEOUT,
+        NAMENODE_HA_STATE_PROBE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
 
     if (wrappedProxy instanceof ClientProtocol) {
       this.observerReadEnabled = true;
@@ -284,13 +325,67 @@ public class ObserverReadProxyProvider<T>
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getHAServiceState(currentProxy));
+    currentProxy.setCachedState(getHAServiceStateWithTimeout(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
     return currentProxy;
   }
 
+  /**
+   * Execute getHAServiceState() call with a timeout, to avoid a long wait when
+   * an NN becomes irresponsive to rpc requests
+   * (when a thread/heap dump is being taken, e.g.).
+   *
+   * For each getHAServiceState() call, a task is created and submitted to a
+   * threadpool for execution. We will wait for a response up to
+   * namenodeHAStateProbeTimeoutSec and cancel these requests if they time out.
+   *
+   * The implementation is split into two functions so that we can unit test
+   * the second function.
+   */
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo) {
+    Callable<HAServiceState> getHAServiceStateTask = () -> getHAServiceState(proxyInfo);
+
+    try {
+      Future<HAServiceState> task =
+          nnProbingThreadPool.submit(getHAServiceStateTask);
+      return getHAServiceStateWithTimeout(proxyInfo, task);
+    } catch (RejectedExecutionException e) {
+      LOG.warn("Run out of threads to submit the request to query HA state. "
+          + "Ok to return null and we will fallback to use active NN to serve "
+          + "this request.");
+      return null;
+    }
+  }
+
+  HAServiceState getHAServiceStateWithTimeout(final NNProxyInfo<T> proxyInfo,
+      Future<HAServiceState> task) {
+    HAServiceState state = null;
+    try {
+      if (namenodeHAStateProbeTimeoutMs > 0) {
+        state = task.get(namenodeHAStateProbeTimeoutMs, TimeUnit.MILLISECONDS);
+      } else {
+        // Disable timeout by waiting indefinitely when namenodeHAStateProbeTimeoutSec is set to 0
+        // or a negative value.
+        state = task.get();
+      }
+      LOG.debug("HA State for {} is {}", proxyInfo.proxyInfo, state);
+    } catch (TimeoutException e) {
+      // Cancel the task on timeout
+      String msg = String.format("Cancel NN probe task due to timeout for %s", proxyInfo.proxyInfo);
+      LOG.warn(msg, e);
+      if (task != null) {
+        task.cancel(true);
+      }
+    } catch (InterruptedException|ExecutionException e) {
+      String msg = String.format("Exception in NN probe task for %s", proxyInfo.proxyInfo);
+      LOG.warn(msg, e);
+    }
+
+    return state;
+  }
+
   /**
    * Fetch the service state from a proxy. If it is unable to be fetched,
    * assume it is in standby state, but log the exception.

+ 208 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -24,7 +26,10 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -36,20 +41,30 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.util.StopWatch;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.event.Level;
 
 import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 
 import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
 
 
 /**
@@ -58,30 +73,42 @@ import static org.mockito.Mockito.when;
  * NameNode to communicate with.
  */
 public class TestObserverReadProxyProvider {
+  private final static long SLOW_RESPONSE_SLEEP_TIME = TimeUnit.SECONDS.toMillis(5); // 5 s
+  private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT = TimeUnit.SECONDS.toMillis(2);
+  private final static long NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG = TimeUnit.SECONDS.toMillis(25);
+  private final GenericTestUtils.LogCapturer proxyLog =
+      GenericTestUtils.LogCapturer.captureLogs(ObserverReadProxyProvider.LOG);
 
   private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0];
   private String ns;
   private URI nnURI;
-  private Configuration conf;
 
   private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
   private NameNodeAnswer[] namenodeAnswers;
   private String[] namenodeAddrs;
 
+  @BeforeClass
+  public static void setLogLevel() {
+    GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG);
+  }
+
   @Before
   public void setup() throws Exception {
     ns = "testcluster";
     nnURI = URI.create("hdfs://" + ns);
-    conf = new Configuration();
-    conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
-    // Set observer probe retry period to 0. Required by the tests that
-    // transition observer back and forth
-    conf.setTimeDuration(
-        OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
-    conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false);
   }
 
   private void setupProxyProvider(int namenodeCount) throws Exception {
+    setupProxyProvider(namenodeCount, new Configuration());
+  }
+
+  private void setupProxyProvider(int namenodeCount, long nnHAStateProbeTimeout) throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(NAMENODE_HA_STATE_PROBE_TIMEOUT, nnHAStateProbeTimeout);
+    setupProxyProvider(namenodeCount, conf);
+  }
+
+  private void setupProxyProvider(int namenodeCount, Configuration conf) throws Exception {
     String[] namenodeIDs = new String[namenodeCount];
     namenodeAddrs = new String[namenodeCount];
     namenodeAnswers = new NameNodeAnswer[namenodeCount];
@@ -104,6 +131,12 @@ public class TestObserverReadProxyProvider {
     }
     conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
         Joiner.on(",").join(namenodeIDs));
+    conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
+    // Set observer probe retry period to 0. Required by the tests that
+    // transition observer back and forth
+    conf.setTimeDuration(
+        OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
+    conf.setBoolean(HdfsClientConfigKeys.Failover.RANDOM_ORDER, false);
     proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI,
         ClientProtocol.class,
         new ClientHAProxyFactory<ClientProtocol>() {
@@ -145,7 +178,7 @@ public class TestObserverReadProxyProvider {
           }
         };
     ObserverReadProxyProvider<GetUserMappingsProtocol> userProxyProvider =
-        new ObserverReadProxyProvider<>(conf, nnURI,
+        new ObserverReadProxyProvider<>(proxyProvider.conf, nnURI,
             GetUserMappingsProtocol.class, proxyFactory);
     assertArrayEquals(fakeGroups,
         userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser));
@@ -325,6 +358,160 @@ public class TestObserverReadProxyProvider {
     assertHandledBy(1);
   }
 
+  /**
+   * Happy case for GetHAServiceStateWithTimeout.
+   */
+  @Test
+  public void testGetHAServiceStateWithTimeout() throws Exception {
+    proxyLog.clearOutput();
+
+    setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
+    final HAServiceState state = HAServiceState.STANDBY;
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    when(task.get(anyLong(), any(TimeUnit.class))).thenReturn(state);
+
+    HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertEquals(state, state2);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verifyNoMoreInteractions(task);
+    assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
+        "HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state));
+    proxyLog.clearOutput();
+  }
+
+  /**
+   * Test TimeoutException for GetHAServiceStateWithTimeout.
+   */
+  @Test
+  public void testTimeoutExceptionGetHAServiceStateWithTimeout() throws Exception {
+    proxyLog.clearOutput();
+
+    setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    TimeoutException e = new TimeoutException("Timeout");
+    when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e);
+
+    HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertNull(state);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verify(task).cancel(true);
+    verifyNoMoreInteractions(task);
+    assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
+        "Cancel NN probe task due to timeout for " + dummyNNProxyInfo.proxyInfo));
+    proxyLog.clearOutput();
+  }
+
+  /**
+   * Test InterruptedException for GetHAServiceStateWithTimeout.
+   */
+  @Test
+  public void testInterruptedExceptionGetHAServiceStateWithTimeout() throws Exception {
+    proxyLog.clearOutput();
+
+    setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    InterruptedException e = new InterruptedException("Interrupted");
+    when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e);
+
+    HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertNull(state);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verifyNoMoreInteractions(task);
+    assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
+        "Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo));
+    proxyLog.clearOutput();
+  }
+
+  /**
+   * Test ExecutionException for GetHAServiceStateWithTimeout.
+   */
+  @Test
+  public void testExecutionExceptionGetHAServiceStateWithTimeout() throws Exception {
+    proxyLog.clearOutput();
+
+    setupProxyProvider(1, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) Mockito.mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    Exception e = new ExecutionException(new InterruptedException("Interrupted"));
+    when(task.get(anyLong(), any(TimeUnit.class))).thenThrow(e);
+
+    HAServiceState state = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertNull(state);
+    verify(task).get(anyLong(), any(TimeUnit.class));
+    verifyNoMoreInteractions(task);
+    assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
+        "Exception in NN probe task for " + dummyNNProxyInfo.proxyInfo));
+    proxyLog.clearOutput();
+  }
+
+  /**
+   * Test GetHAServiceState when timeout is disabled (test the else { task.get() } code path)
+   */
+  @Test
+  public void testGetHAServiceStateWithoutTimeout() throws Exception {
+    proxyLog.clearOutput();
+    setupProxyProvider(1, 0);
+
+    final HAServiceState state = HAServiceState.STANDBY;
+    NNProxyInfo<ClientProtocol> dummyNNProxyInfo =
+        (NNProxyInfo<ClientProtocol>) mock(NNProxyInfo.class);
+    Future<HAServiceState> task = mock(Future.class);
+    when(task.get()).thenReturn(state);
+
+    HAServiceState state2 = proxyProvider.getHAServiceStateWithTimeout(dummyNNProxyInfo, task);
+    assertEquals(state, state2);
+    verify(task).get();
+    verifyNoMoreInteractions(task);
+    assertEquals(1, StringUtils.countMatches(proxyLog.getOutput(),
+        "HA State for " + dummyNNProxyInfo.proxyInfo + " is " + state));
+    proxyLog.clearOutput();
+  }
+
+  /**
+   * Test getHAServiceState when we have a slow NN, using a 25s timeout.
+   * This is to verify the old behavior without being able to fast-fail (we can also set
+   * namenodeHAStateProbeTimeoutMs to 0 or a negative value and the rest of the test can stay
+   * the same).
+   *
+   * 5-second (SLOW_RESPONSE_SLEEP_TIME) latency is introduced and we expect that latency is added
+   * to the READ operation.
+   */
+  @Test
+  public void testStandbyGetHAServiceStateLongTimeout() throws Exception {
+    setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_LONG);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setSlowNode(true);
+    namenodeAnswers[3].setObserverState();
+
+    StopWatch watch = new StopWatch();
+    watch.start();
+    doRead();
+    long runtime = watch.now(TimeUnit.MILLISECONDS);
+    assertTrue("Read operation finished earlier than we expected",
+        runtime > SLOW_RESPONSE_SLEEP_TIME);
+  }
+
+  /**
+   * Test getHAServiceState using a 2s timeout with a slow standby.
+   * Fail the test if we don't complete it in 4s.
+   */
+  @Test(timeout = 4000)
+  public void testStandbyGetHAServiceStateTimeout() throws Exception {
+    setupProxyProvider(4, NAMENODE_HA_STATE_PROBE_TIMEOUT_SHORT);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setSlowNode(true);
+    namenodeAnswers[3].setObserverState();
+
+    doRead();
+  }
+
   private void doRead() throws Exception {
     doRead(proxyProvider.getProxy().proxy);
   }
@@ -357,6 +544,7 @@ public class TestObserverReadProxyProvider {
 
     private volatile boolean unreachable = false;
     private volatile boolean retryActive = false;
+    private volatile boolean slowNode = false;
 
     // Standby state by default
     private volatile boolean allowWrites = false;
@@ -370,6 +558,12 @@ public class TestObserverReadProxyProvider {
         if (unreachable) {
           throw new IOException("Unavailable");
         }
+
+        // sleep to simulate slow rpc responses.
+        if (slowNode) {
+          Thread.sleep(SLOW_RESPONSE_SLEEP_TIME);
+        }
+
         // retryActive should be checked before getHAServiceState.
         // Check getHAServiceState first here only because in test,
         // it relies read call, which relies on getHAServiceState
@@ -416,6 +610,11 @@ public class TestObserverReadProxyProvider {
       this.unreachable = unreachable;
     }
 
+    // Whether this node should be slow in rpc response.
+    void setSlowNode(boolean slowNode) {
+      this.slowNode = slowNode;
+    }
+
     void setActiveState() {
       allowReads = true;
       allowWrites = true;