Browse Source

HDFS-14162. [SBN read] Allow Balancer to work with Observer node. Add a new ProxyCombiner allowing for multiple related protocols to be combined. Allow AlignmentContext to be passed in NameNodeProxyFactory. Contributed by Erik Krogen.

(cherry picked from 64f28f9efa2ef3cd9dd54a6c5009029721e030ed)
(cherry picked from 69b0c513a9b11cc7f795747732173b36aacbe794)
Erik Krogen 6 years ago
parent
commit
56caacac1f

+ 137 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProxyCombiner.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.Client.ConnectionId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A utility class used to combine two protocol proxies.
+ * See {@link #combine(Class, Object...)}.
+ */
+public final class ProxyCombiner {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ProxyCombiner.class);
+
+  private ProxyCombiner() { }
+
+  /**
+   * Combine two or more proxies which together comprise a single proxy
+   * interface. This can be used for a protocol interface which {@code extends}
+   * multiple other protocol interfaces. The returned proxy will implement
+   * all of the methods of the combined proxy interface, delegating calls
+   * to which proxy implements that method. If multiple proxies implement the
+   * same method, the first in the list will be used for delegation.
+   *
+   * <p/>This will check that every method on the combined interface is
+   * implemented by at least one of the supplied proxy objects.
+   *
+   * @param combinedProxyInterface The interface of the combined proxy.
+   * @param proxies The proxies which should be used as delegates.
+   * @param <T> The type of the proxy that will be returned.
+   * @return The combined proxy.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T combine(Class<T> combinedProxyInterface,
+      Object... proxies) {
+    methodLoop:
+    for (Method m : combinedProxyInterface.getMethods()) {
+      for (Object proxy : proxies) {
+        try {
+          proxy.getClass().getMethod(m.getName(), m.getParameterTypes());
+          continue methodLoop; // go to the next method
+        } catch (NoSuchMethodException nsme) {
+          // Continue to try the next proxy
+        }
+      }
+      throw new IllegalStateException("The proxies specified for "
+          + combinedProxyInterface + " do not cover method " + m);
+    }
+
+    InvocationHandler handler = new CombinedProxyInvocationHandler(proxies);
+    return (T) Proxy.newProxyInstance(combinedProxyInterface.getClassLoader(),
+        new Class[] {combinedProxyInterface}, handler);
+  }
+
+  private static final class CombinedProxyInvocationHandler
+      implements RpcInvocationHandler {
+
+    private final Object[] proxies;
+
+    private CombinedProxyInvocationHandler(Object[] proxies) {
+      this.proxies = proxies;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      Exception lastException = null;
+      for (Object underlyingProxy : proxies) {
+        try {
+          return method.invoke(underlyingProxy, args);
+        } catch (IllegalAccessException|IllegalArgumentException e) {
+          lastException = e;
+        }
+      }
+      // This shouldn't happen since the method coverage was verified in build()
+      LOG.error("BUG: Method {} was unable to be found on any of the "
+          + "underlying proxies for {}", method, proxy.getClass());
+      throw new IllegalArgumentException("Method " + method + " not supported",
+          lastException);
+    }
+
+    /**
+     * Since this is incapable of returning multiple connection IDs, simply
+     * return the first one. In most cases, the connection ID should be the same
+     * for all proxies.
+     */
+    @Override
+    public ConnectionId getConnectionId() {
+      return RPC.getConnectionIdForProxy(proxies[0]);
+    }
+
+    @Override
+    public void close() throws IOException {
+      MultipleIOException.Builder exceptionBuilder =
+          new MultipleIOException.Builder();
+      for (Object proxy : proxies) {
+        if (proxy instanceof Closeable) {
+          try {
+            ((Closeable) proxy).close();
+          } catch (IOException ioe) {
+            exceptionBuilder.add(ioe);
+          }
+        }
+      }
+      if (!exceptionBuilder.isEmpty()) {
+        throw exceptionBuilder.build();
+      }
+    }
+  }
+}

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAProxyFactory.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -41,4 +42,12 @@ public interface HAProxyFactory<T> {
   T createProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries) throws IOException;
 
+  /**
+   * Set the alignment context to be used when creating new proxies using
+   * this factory. Not all implementations will use this alignment context.
+   */
+  default void setAlignmentContext(AlignmentContext alignmentContext) {
+    // noop
+  }
+
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java

@@ -156,7 +156,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     super(conf, uri, xface, factory);
     this.failoverProxy = failoverProxy;
     this.alignmentContext = new ClientGSIContext();
-    ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
+    factory.setAlignmentContext(alignmentContext);
 
     // Don't bother configuring the number of retries and such on the retry
     // policy since it is mainly only used for determining whether or not an

+ 70 - 38
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java

@@ -37,13 +37,16 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProxyCombiner;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
@@ -118,7 +121,7 @@ public class NameNodeProxies {
     if (failoverProxyProvider == null) {
       return createNonHAProxy(conf, DFSUtilClient.getNNAddress(nameNodeUri),
           xface, UserGroupInformation.getCurrentUser(), true,
-          fallbackToSimpleAuth);
+          fallbackToSimpleAuth, null);
     } else {
       return NameNodeProxiesClient.createHAProxy(conf, nameNodeUri, xface,
           failoverProxyProvider);
@@ -141,7 +144,7 @@ public class NameNodeProxies {
   public static <T> ProxyAndInfo<T> createNonHAProxy(
       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries) throws IOException {
-    return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
+    return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null, null);
   }
 
   /**
@@ -163,27 +166,36 @@ public class NameNodeProxies {
   public static <T> ProxyAndInfo<T> createNonHAProxy(
       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries,
-      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+      AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+      throws IOException {
     Text dtService = SecurityUtil.buildTokenService(nnAddr);
   
     T proxy;
     if (xface == ClientProtocol.class) {
-      proxy = (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
-          nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth);
+      proxy = (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
+          nnAddr, conf, ugi, withRetries, fallbackToSimpleAuth,
+          alignmentContext);
     } else if (xface == JournalProtocol.class) {
-      proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi,
+          alignmentContext);
     } else if (xface == NamenodeProtocol.class) {
       proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
-          withRetries);
+          withRetries, alignmentContext);
     } else if (xface == GetUserMappingsProtocol.class) {
-      proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi,
+          alignmentContext);
     } else if (xface == RefreshUserMappingsProtocol.class) {
-      proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf,
+          ugi, alignmentContext);
     } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
       proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
-          conf, ugi);
+          conf, ugi, alignmentContext);
     } else if (xface == RefreshCallQueueProtocol.class) {
-      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
+      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi,
+          alignmentContext);
+    } else if (xface == BalancerProtocols.class) {
+      proxy = (T) createNNProxyWithBalancerProtocol(nnAddr, conf, ugi,
+          withRetries, fallbackToSimpleAuth, alignmentContext);
     } else {
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to NameNode: " +
@@ -194,52 +206,57 @@ public class NameNodeProxies {
 
     return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
   }
-  
+
   private static JournalProtocol createNNProxyWithJournalProtocol(
-      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
-      throws IOException {
-    JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
-        conf, ugi, JournalProtocolPB.class, 30000);
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    JournalProtocolPB proxy = createNameNodeProxy(address,
+        conf, ugi, JournalProtocolPB.class, 30000, alignmentContext);
     return new JournalProtocolTranslatorPB(proxy);
   }
 
   private static RefreshAuthorizationPolicyProtocol
       createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
-          Configuration conf, UserGroupInformation ugi) throws IOException {
-    RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
-        createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0);
+      Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address,
+        conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0,
+        alignmentContext);
     return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
   }
   
   private static RefreshUserMappingsProtocol
       createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
-          Configuration conf, UserGroupInformation ugi) throws IOException {
-    RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
-        createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0);
+      Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf,
+        ugi, RefreshUserMappingsProtocolPB.class, 0, alignmentContext);
     return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
   }
 
   private static RefreshCallQueueProtocol
       createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
-          Configuration conf, UserGroupInformation ugi) throws IOException {
-    RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
-        createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0);
+      Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
+        RefreshCallQueueProtocolPB.class, 0, alignmentContext);
     return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
   }
 
   private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
-      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
-      throws IOException {
-    GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
-        createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      AlignmentContext alignmentContext) throws IOException {
+    GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
+        GetUserMappingsProtocolPB.class, 0, alignmentContext);
     return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
   }
   
   private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
-      boolean withRetries) throws IOException {
-    NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
-        address, conf, ugi, NamenodeProtocolPB.class, 0);
+      boolean withRetries, AlignmentContext alignmentContext)
+      throws IOException {
+    NamenodeProtocolPB proxy = createNameNodeProxy(
+        address, conf, ugi, NamenodeProtocolPB.class, 0, alignmentContext);
     if (withRetries) { // create the proxy with retries
       RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
               TimeUnit.MILLISECONDS);
@@ -256,13 +273,28 @@ public class NameNodeProxies {
     }
   }
 
-  private static Object createNameNodeProxy(InetSocketAddress address,
-      Configuration conf, UserGroupInformation ugi, Class<?> xface,
-      int rpcTimeout) throws IOException {
+  private static BalancerProtocols createNNProxyWithBalancerProtocol(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
+      AlignmentContext alignmentContext) throws IOException {
+    NamenodeProtocol namenodeProtocol = createNNProxyWithNamenodeProtocol(
+        address, conf, ugi, withRetries, alignmentContext);
+    ClientProtocol clientProtocol =
+        NameNodeProxiesClient.createProxyWithAlignmentContext(address,
+            conf, ugi, withRetries, fallbackToSimpleAuth, alignmentContext);
+
+    return ProxyCombiner.combine(BalancerProtocols.class,
+        namenodeProtocol, clientProtocol);
+  }
+
+  private static <T> T createNameNodeProxy(InetSocketAddress address,
+      Configuration conf, UserGroupInformation ugi, Class<T> xface,
+      int rpcTimeout, AlignmentContext alignmentContext) throws IOException {
     RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
-    Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
-        ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
-    return proxy;
+    return RPC.getProtocolProxy(xface,
+        RPC.getProtocolVersion(xface), address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf), rpcTimeout, null, null,
+        alignmentContext).getProxy();
   }
 
 }

+ 4 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java

@@ -43,11 +43,11 @@ import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -104,8 +104,7 @@ public class NameNodeConnector implements Closeable {
   private final URI nameNodeUri;
   private final String blockpoolID;
 
-  private final NamenodeProtocol namenode;
-  private final ClientProtocol client;
+  private final BalancerProtocols namenode;
   private final KeyManager keyManager;
   final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
@@ -129,9 +128,7 @@ public class NameNodeConnector implements Closeable {
     this.maxNotChangedIterations = maxNotChangedIterations;
 
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
-        NamenodeProtocol.class).getProxy();
-    this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
-        ClientProtocol.class, fallbackToSimpleAuth).getProxy();
+        BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
     this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();
@@ -185,7 +182,7 @@ public class NameNodeConnector implements Closeable {
   /** @return live datanode storage reports. */
   public DatanodeStorageReport[] getLiveDatanodeStorageReport()
       throws IOException {
-    return client.getDatanodeStorageReport(DatanodeReportType.LIVE);
+    return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE);
   }
 
   /** @return the key manager */

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/NameNodeHAProxyFactory.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -27,12 +28,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
 
+  private AlignmentContext alignmentContext;
+
   @Override
   public T createProxy(Configuration conf, InetSocketAddress nnAddr,
       Class<T> xface, UserGroupInformation ugi, boolean withRetries,
       AtomicBoolean fallbackToSimpleAuth) throws IOException {
     return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
-      ugi, withRetries, fallbackToSimpleAuth).getProxy();
+        ugi, withRetries, fallbackToSimpleAuth, alignmentContext).getProxy();
   }
 
   @Override
@@ -42,4 +45,8 @@ public class NameNodeHAProxyFactory<T> implements HAProxyFactory<T> {
     return NameNodeProxies.createNonHAProxy(conf, nnAddr, xface,
       ugi, withRetries).getProxy();
   }
+
+  public void setAlignmentContext(AlignmentContext alignmentContext) {
+    this.alignmentContext = alignmentContext;
+  }
 }

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BalancerProtocols.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+
+/** The full set of protocols used by the Balancer. */
+@InterfaceAudience.Private
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY)
+public interface BalancerProtocols extends ClientProtocol, NamenodeProtocol { }

+ 68 - 33
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java

@@ -18,14 +18,13 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.net.URI;
 import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
@@ -33,7 +32,9 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
 import org.junit.Test;
 
 /**
@@ -43,6 +44,13 @@ public class TestBalancerWithHANameNodes {
   private MiniDFSCluster cluster;
   ClientProtocol client;
 
+  // array of racks for original nodes in cluster
+  private static final String[] TEST_RACKS =
+      {TestBalancer.RACK0, TestBalancer.RACK1};
+  // array of capacities for original nodes in cluster
+  private static final long[] TEST_CAPACITIES =
+      {TestBalancer.CAPACITY, TestBalancer.CAPACITY};
+
   static {
     TestBalancer.initTestSetup();
   }
@@ -57,52 +65,79 @@ public class TestBalancerWithHANameNodes {
   public void testBalancerWithHANameNodes() throws Exception {
     Configuration conf = new HdfsConfiguration();
     TestBalancer.initConf(conf);
-    long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
-    String newNodeRack = TestBalancer.RACK2; // new node's rack
-    // array of racks for original nodes in cluster
-    String[] racks = new String[] { TestBalancer.RACK0, TestBalancer.RACK1 };
-    // array of capacities of original nodes in cluster
-    long[] capacities = new long[] { TestBalancer.CAPACITY,
-        TestBalancer.CAPACITY };
-    assertEquals(capacities.length, racks.length);
-    int numOfDatanodes = capacities.length;
+    assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
     NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
     nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
     Configuration copiedConf = new Configuration(conf);
     cluster = new MiniDFSCluster.Builder(copiedConf)
         .nnTopology(MiniDFSNNTopology.simpleHATopology())
-        .numDataNodes(capacities.length)
-        .racks(racks)
-        .simulatedCapacities(capacities)
+        .numDataNodes(TEST_CAPACITIES.length)
+        .racks(TEST_RACKS)
+        .simulatedCapacities(TEST_CAPACITIES)
         .build();
     HATestUtil.setFailoverConfigurations(cluster, conf);
     try {
       cluster.waitActive();
-      cluster.transitionToActive(1);
+      cluster.transitionToActive(0);
       Thread.sleep(500);
       client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
           ClientProtocol.class).getProxy();
-      long totalCapacity = TestBalancer.sum(capacities);
-      // fill up the cluster to be 30% full
-      long totalUsedSpace = totalCapacity * 3 / 10;
-      TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
-          / numOfDatanodes, (short) numOfDatanodes, 1);
 
-      // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
-          new long[] { newNodeCapacity });
-      totalCapacity += newNodeCapacity;
-      TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
-          cluster);
-      Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-      assertEquals(1, namenodes.size());
-      assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
-      final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
-      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
-      TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
-          cluster, BalancerParameters.DEFAULT);
+      doTest(conf);
     } finally {
       cluster.shutdown();
     }
   }
+
+  void doTest(Configuration conf) throws Exception {
+    int numOfDatanodes = TEST_CAPACITIES.length;
+    long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
+    // fill up the cluster to be 30% full
+    long totalUsedSpace = totalCapacity * 3 / 10;
+    TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
+        / numOfDatanodes, (short) numOfDatanodes, 0);
+
+    // start up an empty node with the same capacity and on the same rack
+    long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
+    String newNodeRack = TestBalancer.RACK2; // new node's rack
+    cluster.startDataNodes(conf, 1, true, null, new String[] {newNodeRack},
+        new long[] {newNodeCapacity});
+    totalCapacity += newNodeCapacity;
+    TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
+        cluster);
+    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    assertEquals(1, namenodes.size());
+    final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+    assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+    TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
+        cluster, BalancerParameters.DEFAULT);
+  }
+
+  /**
+   * Test Balancer with ObserverNodes.
+   */
+  @Test(timeout = 60000)
+  public void testBalancerWithObserver() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    TestBalancer.initConf(conf);
+
+    MiniQJMHACluster qjmhaCluster = null;
+    try {
+      qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 2,
+          TEST_CAPACITIES.length, true, TEST_CAPACITIES, TEST_RACKS);
+      cluster = qjmhaCluster.getDfsCluster();
+      cluster.waitClusterUp();
+      cluster.waitActive();
+
+      DistributedFileSystem dfs = HATestUtil.configureObserverReadFs(
+          cluster, conf, ObserverReadProxyProvider.class, true);
+      client = dfs.getClient().getNamenode();
+
+      doTest(conf);
+    } finally {
+      if (qjmhaCluster != null) {
+        qjmhaCluster.shutdown();
+      }
+    }
+  }
 }

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java

@@ -209,6 +209,14 @@ public abstract class HATestUtil {
   public static MiniQJMHACluster setUpObserverCluster(
       Configuration conf, int numObservers, int numDataNodes,
       boolean fastTailing) throws IOException {
+    return setUpObserverCluster(conf, numObservers, numDataNodes,
+        fastTailing, null, null);
+  }
+
+  public static MiniQJMHACluster setUpObserverCluster(
+      Configuration conf, int numObservers, int numDataNodes,
+      boolean fastTailing, long[] simulatedCapacities,
+      String[] racks) throws IOException {
     // disable block scanner
     conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
 
@@ -224,7 +232,9 @@ public abstract class HATestUtil {
 
     MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
         .setNumNameNodes(2 + numObservers);
-    qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes);
+    qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes)
+        .simulatedCapacities(simulatedCapacities)
+        .racks(racks);
     MiniQJMHACluster qjmhaCluster = qjmBuilder.build();
     MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();