Przeglądaj źródła

HDFS-13778. [SBN read] TestStateAlignmentContextWithHA should use real ObserverReadProxyProvider instead of AlignmentContextProxyProvider. Contributed by Konstantin Shvachko and Plamen Jeliazkov. Also fix a number of java7 incompatibilities from previous SbN read commits.

Konstantin V Shvachko 6 lat temu
rodzic
commit
7eacf5f8f1

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -568,7 +568,7 @@ public class QuorumJournalManager implements JournalManager {
     LOG.info("Selected loggers with >= " + maxAllowedTxns +
         " transactions starting from " + fromTxnId);
     PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
-        JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+        responseMap.size(), JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (GetJournaledEditsResponseProto resp : responseMap.values()) {
       long endTxnId = fromTxnId - 1 +
           Math.min(maxAllowedTxns, resp.getTxnCount());

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.HashSet;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -50,9 +51,10 @@ class GlobalStateIdContext implements AlignmentContext {
     // For now, only ClientProtocol methods can be coordinated, so only checking
     // against ClientProtocol.
     for (Method method : ClientProtocol.class.getDeclaredMethods()) {
-      if (method.isAnnotationPresent(ReadOnly.class) &&
-          method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) {
-        coordinatedMethods.add(method.getName());
+      for (Annotation anno : method.getAnnotations()) {
+        if ((anno instanceof ReadOnly) && ((ReadOnly) anno).isCoordinated()) {
+          coordinatedMethods.add(method.getName());
+        }
       }
     }
   }

+ 57 - 132
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java

@@ -18,28 +18,24 @@
 
 package org.apache.hadoop.hdfs;
 
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
-import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
-import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
-import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
@@ -64,55 +60,31 @@ import org.mockito.stubbing.Answer;
  * to the most recent alignment state of the server.
  */
 public class TestStateAlignmentContextWithHA {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestStateAlignmentContextWithHA.class.getName());
 
   private static final int NUMDATANODES = 1;
   private static final int NUMCLIENTS = 10;
-  private static final int NUMFILES = 300;
+  private static final int NUMFILES = 120;
   private static final Configuration CONF = new HdfsConfiguration();
-  private static final String NAMESERVICE = "nameservice";
   private static final List<ClientGSIContext> AC_LIST = new ArrayList<>();
 
   private static MiniDFSCluster cluster;
   private static List<Worker> clients;
-  private static ClientGSIContext spy;
 
   private DistributedFileSystem dfs;
   private int active = 0;
   private int standby = 1;
 
-  static class AlignmentContextProxyProvider<T>
-      extends ConfiguredFailoverProxyProvider<T> {
+  static class ORPPwithAlignmentContexts<T extends ClientProtocol>
+      extends ObserverReadProxyProvider<T> {
 
-    private ClientGSIContext alignmentContext;
-
-    public AlignmentContextProxyProvider(
+    public ORPPwithAlignmentContexts(
         Configuration conf, URI uri, Class<T> xface,
         HAProxyFactory<T> factory) throws IOException {
       super(conf, uri, xface, factory);
 
-      // Create and set AlignmentContext in HAProxyFactory.
-      // All proxies by factory will now have AlignmentContext assigned.
-      this.alignmentContext = (spy != null ? spy : new ClientGSIContext());
-      ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
-
-      AC_LIST.add(alignmentContext);
-    }
-  }
-
-  static class SpyConfiguredContextProxyProvider<T>
-      extends ConfiguredFailoverProxyProvider<T> {
-
-    private ClientGSIContext alignmentContext;
-
-    public SpyConfiguredContextProxyProvider(
-        Configuration conf, URI uri, Class<T> xface,
-        HAProxyFactory<T> factory) throws IOException {
-      super(conf, uri, xface, factory);
-
-      // Create but DON'T set in HAProxyFactory.
-      this.alignmentContext = (spy != null ? spy : new ClientGSIContext());
-
-      AC_LIST.add(alignmentContext);
+      AC_LIST.add((ClientGSIContext) getAlignmentContext());
     }
   }
 
@@ -124,23 +96,21 @@ public class TestStateAlignmentContextWithHA {
     CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
     CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
 
-    MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE);
-    nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1"));
-    nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2"));
-
     cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf))
+        .nnTopology(MiniDFSNNTopology.simpleHATopology(3))
         .build();
     cluster.waitActive();
     cluster.transitionToActive(0);
+    cluster.transitionToObserver(2);
+
+    String nameservice = HATestUtil.getLogicalHostname(cluster);
+    HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0);
+    CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
+        "." + nameservice, ORPPwithAlignmentContexts.class.getName());
   }
 
   @Before
   public void before() throws IOException, URISyntaxException {
-    killWorkers();
-    HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0);
-    CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
-        "." + NAMESERVICE, AlignmentContextProxyProvider.class.getName());
     dfs = (DistributedFileSystem) FileSystem.get(CONF);
   }
 
@@ -154,6 +124,7 @@ public class TestStateAlignmentContextWithHA {
 
   @After
   public void after() throws IOException {
+    killWorkers();
     cluster.transitionToStandby(1);
     cluster.transitionToActive(0);
     active = 0;
@@ -163,26 +134,6 @@ public class TestStateAlignmentContextWithHA {
       dfs = null;
     }
     AC_LIST.clear();
-    spy = null;
-  }
-
-  /**
-   * This test checks if after a client writes we can see the state id in
-   * updated via the response.
-   */
-  @Test
-  public void testNoStateOnConfiguredProxyProvider() throws Exception {
-    Configuration confCopy = new Configuration(CONF);
-    confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
-        "." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName());
-
-    try (DistributedFileSystem clearDfs =
-             (DistributedFileSystem) FileSystem.get(confCopy)) {
-      ClientGSIContext clientState = getContext(1);
-      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
-      DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state");
-      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
-    }
   }
 
   /**
@@ -236,51 +187,6 @@ public class TestStateAlignmentContextWithHA {
     }
   }
 
-  /**
-   * This test mocks an AlignmentContext and ensures that DFSClient
-   * writes its lastSeenStateId into RPC requests.
-   */
-  @Test
-  public void testClientSendsState() throws Exception {
-    ClientGSIContext alignmentContext = new ClientGSIContext();
-    ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext);
-    spy = spiedAlignContext;
-
-    try (DistributedFileSystem clearDfs =
-             (DistributedFileSystem) FileSystem.get(CONF)) {
-
-      // Collect RpcRequestHeaders for verification later.
-      final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> headers =
-          new ArrayList<>();
-      Mockito.doAnswer(new Answer() {
-        @Override
-        public Object answer(InvocationOnMock a) throws Throwable {
-          Object[] arguments = a.getArguments();
-          RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
-              (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
-          headers.add(header);
-          return a.callRealMethod();
-        }
-      }).when(spiedAlignContext).updateRequestState(Mockito.any());
-
-      DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
-
-      // Ensure first header and last header have different state.
-      assertThat(headers.size() > 1, is(true));
-      assertThat(headers.get(0).getStateId(),
-          is(not(headers.get(headers.size() - 1))));
-
-      // Ensure collected RpcRequestHeaders are in increasing order.
-      long lastHeader = headers.get(0).getStateId();
-      for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
-          headers.subList(1, headers.size())) {
-        long currentHeader = header.getStateId();
-        assertThat(currentHeader >= lastHeader, is(true));
-        lastHeader = header.getStateId();
-      }
-    }
-  }
-
   /**
    * This test checks if after a client writes we can see the state id in
    * updated via the response.
@@ -316,14 +222,22 @@ public class TestStateAlignmentContextWithHA {
 
   @Test(timeout=300000)
   public void testMultiClientStatesWithRandomFailovers() throws Exception {
-    // We want threads to run during failovers; assuming at minimum 4 cores,
-    // would like to see 2 clients competing against 2 NameNodes.
+    // First run, half the load, with one failover.
+    runClientsWithFailover(1, NUMCLIENTS/2, NUMFILES/2);
+    // Second half, with fail back.
+    runClientsWithFailover(NUMCLIENTS/2 + 1, NUMCLIENTS, NUMFILES/2);
+  }
+
+  private void runClientsWithFailover(int clientStartId,
+                                      int numClients,
+                                      int numFiles)
+      throws Exception {
     ExecutorService execService = Executors.newFixedThreadPool(2);
-    clients = new ArrayList<>(NUMCLIENTS);
-    for (int i = 1; i <= NUMCLIENTS; i++) {
+    clients = new ArrayList<>(numClients);
+    for (int i = clientStartId; i <= numClients; i++) {
       DistributedFileSystem haClient =
           (DistributedFileSystem) FileSystem.get(CONF);
-      clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i));
+      clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i));
     }
 
     // Execute workers in threadpool with random failovers.
@@ -331,15 +245,18 @@ public class TestStateAlignmentContextWithHA {
     execService.shutdown();
 
     boolean finished = false;
+    failOver();
+
     while (!finished) {
-      failOver();
-      finished = execService.awaitTermination(1L, TimeUnit.SECONDS);
+      finished = execService.awaitTermination(20L, TimeUnit.SECONDS);
     }
 
     // Validation.
     for (Future<STATE> future : futures) {
       assertThat(future.get(), is(STATE.SUCCESS));
     }
+
+    clients.clear();
   }
 
   private ClientGSIContext getContext(int clientCreationIndex) {
@@ -347,7 +264,9 @@ public class TestStateAlignmentContextWithHA {
   }
 
   private void failOver() throws IOException {
+    LOG.info("Transitioning Active to Standby");
     cluster.transitionToStandby(active);
+    LOG.info("Transitioning Standby to Active");
     cluster.transitionToActive(standby);
     int tempActive = active;
     active = standby;
@@ -394,30 +313,36 @@ public class TestStateAlignmentContextWithHA {
 
     @Override
     public STATE call() {
+      int i = -1;
       try {
-        for (int i = 0; i < filesToMake; i++) {
-          long preClientStateFO =
-              getContext(nonce).getLastSeenStateId();
+        for (i = 0; i < filesToMake; i++) {
+          ClientGSIContext gsiContext = getContext(nonce);
+          long preClientStateFO = gsiContext.getLastSeenStateId();
 
           // Write using HA client.
-          Path path = new Path(filePath + nonce + i);
+          Path path = new Path(filePath + nonce + "_" + i);
           DFSTestUtil.writeFile(client, path, "erk");
 
-          long postClientStateFO =
-              getContext(nonce).getLastSeenStateId();
+          long postClientStateFO = gsiContext.getLastSeenStateId();
 
           // Write(s) should have increased state. Check for greater than.
-          if (postClientStateFO <= preClientStateFO) {
-            System.out.println("FAIL: Worker started with: " +
-                preClientStateFO + ", but finished with: " + postClientStateFO);
+          if (postClientStateFO < 0 || postClientStateFO <= preClientStateFO) {
+            LOG.error("FAIL: Worker started with: {} , but finished with: {}",
+                preClientStateFO, postClientStateFO);
             return STATE.FAIL;
           }
+
+          if(i % (NUMFILES/10) == 0) {
+            LOG.info("Worker {} created {} files", nonce, i);
+            LOG.info("LastSeenStateId = {}", postClientStateFO);
+          }
         }
-        client.close();
         return STATE.SUCCESS;
-      } catch (IOException e) {
-        System.out.println("ERROR: Worker failed with: " + e);
+      } catch (Exception e) {
+        LOG.error("ERROR: Worker failed with: ", e);
         return STATE.ERROR;
+      } finally {
+        LOG.info("Worker {} created {} files", nonce, i);
       }
     }
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

@@ -1032,7 +1032,8 @@ public class TestQuorumJournalManager {
     cluster.getJournalNode(1).stopAndJoin(0);
 
     try {
-      qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
+      qjm.selectInputStreams(new ArrayList<EditLogInputStream>(), 1,
+          true, false);
       fail("");
     } catch (QuorumException qe) {
       GenericTestUtils.assertExceptionContains(

+ 14 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

@@ -368,19 +368,22 @@ public class TestObserverNode {
 
     // a status flag, initialized to 0, after reader finished, this will be
     // updated to 1, -1 on error
-    AtomicInteger readStatus = new AtomicInteger(0);
+    final AtomicInteger readStatus = new AtomicInteger(0);
 
     // create a separate thread to make a blocking read.
-    Thread reader = new Thread(() -> {
-      try {
-        // this read call will block until server state catches up. But due to
-        // configuration, this will take a very long time.
-        dfs.getClient().getFileInfo("/");
-        readStatus.set(1);
-        fail("Should have been interrupted before getting here.");
-      } catch (IOException e) {
-        e.printStackTrace();
-        readStatus.set(-1);
+    Thread reader = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // this read call will block until server state catches up. But due to
+          // configuration, this will take a very long time.
+          dfs.getClient().getFileInfo("/");
+          readStatus.set(1);
+          fail("Should have been interrupted before getting here.");
+        } catch (IOException e) {
+          e.printStackTrace();
+          readStatus.set(-1);
+        }
       }
     });
     reader.start();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java

@@ -72,7 +72,7 @@ public class TestObserverReadProxyProvider {
     namenodeAddrs = new String[namenodeCount];
     namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
     ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
-    Map<String, ClientProtocol> proxyMap = new HashMap<>();
+    final Map<String, ClientProtocol> proxyMap = new HashMap<>();
     for (int i  = 0; i < namenodeCount; i++) {
       namenodeIDs[i] = "nn" + i;
       namenodeAddrs[i] = "namenode" + i + ".test:8020";

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java

@@ -441,9 +441,9 @@ public class TestStandbyInProgressTail {
    * Wait up to 1 second until the given NameNode is aware of the existing of
    * all of the provided fileNames.
    */
-  private static void waitForFileInfo(NameNode standbyNN, String... fileNames)
+  private static void waitForFileInfo(final NameNode standbyNN, String... fileNames)
       throws Exception {
-    List<String> remainingFiles = Lists.newArrayList(fileNames);
+    final List<String> remainingFiles = Lists.newArrayList(fileNames);
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {