Bläddra i källkod

HDFS-14979 Allow Balancer to submit getBlocks calls to Observer Nodes when possible. Contributed by Erik Krogen.

Erik Krogen 5 år sedan
förälder
incheckning
586defe711

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
 import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.KerberosInfo;
@@ -78,6 +79,7 @@ public interface NamenodeProtocol {
   datanode does not exist
    */
   @Idempotent
+  @ReadOnly
   BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
       minBlockSize) throws IOException;
 

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java

@@ -18,8 +18,15 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +40,8 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
 import org.junit.Test;
@@ -128,12 +137,24 @@ public class TestBalancerWithHANameNodes {
       cluster = qjmhaCluster.getDfsCluster();
       cluster.waitClusterUp();
       cluster.waitActive();
+      List<FSNamesystem> namesystemSpies = new ArrayList<>();
+      for (int i = 0; i < cluster.getNumNameNodes(); i++) {
+        namesystemSpies.add(
+            NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i)));
+      }
 
       DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
           cluster, conf, ObserverReadProxyProvider.class, true);
       client = dfs.getClient().getNamenode();
 
       doTest(conf);
+      for (int i = 0; i < cluster.getNumNameNodes(); i++) {
+        // First observer node is at idx 2 so it should get both getBlocks calls
+        // all other NameNodes should see 0 getBlocks calls
+        int expectedCount = (i == 2) ? 2 : 0;
+        verify(namesystemSpies.get(i), times(expectedCount))
+            .getBlocks(any(), anyLong(), anyLong());
+      }
     } finally {
       if (qjmhaCluster != null) {
         qjmhaCluster.shutdown();