Explorar o código

HDFS-15767. RBF: Router federation rename of directory. Contributed by Jinglun.

Ayush Saxena %!s(int64=4) %!d(string=hai) anos
pai
achega
e40f99f6d5
Modificáronse 16 ficheiros con 981 adicións e 32 borrados
  1. 25 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
  2. 10 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java
  3. 15 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java
  4. 27 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
  5. 17 12
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
  6. 202 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
  7. 86 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
  8. 59 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
  9. 16 1
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
  10. 28 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
  11. 455 0
      hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
  12. 8 0
      hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java
  13. 5 5
      hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java
  14. 25 12
      hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java
  15. 1 0
      hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java
  16. 2 2
      hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml

@@ -96,6 +96,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
@@ -115,6 +120,26 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-hs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java

@@ -697,6 +697,16 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
         MembershipStats::getHighestPriorityLowRedundancyECBlocks);
   }
 
+  @Override
+  public int getRouterFederationRenameCount() {
+    return this.router.getRpcServer().getRouterFederationRenameCount();
+  }
+
+  @Override
+  public int getSchedulerJobCount() {
+    return this.router.getRpcServer().getSchedulerJobCount();
+  }
+
   @Override
   public String getSafemode() {
     if (this.router.isRouterState(RouterServiceState.SAFEMODE)) {

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RouterMBean.java

@@ -108,4 +108,19 @@ public interface RouterMBean {
    * @return Json string of owners to token counts
    */
   String getTopTokenRealOwners();
+
+  /**
+   * Gets the count of the currently running router federation rename jobs.
+   *
+   * @return the count of the currently running router federation rename jobs.
+   */
+  int getRouterFederationRenameCount();
+
+  /**
+   * Gets the count of the currently running jobs in the scheduler. It includes
+   * both the submitted and the recovered jobs.
+   *
+   * @return the count of the currently running jobs in the scheduler.
+   */
+  int getSchedulerJobCount();
 }

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

@@ -348,4 +348,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       NoRouterRpcFairnessPolicyController.class;
   public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
       FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
+
+  // HDFS Router Federation Rename.
+  public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
+      FEDERATION_ROUTER_PREFIX + "federation.rename.";
+  public static final String DFS_ROUTER_FEDERATION_RENAME_OPTION =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "option";
+  public static final String DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT =
+      "NONE";
+  public static final String
+      DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "force.close.open.file";
+  public static final boolean
+      DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT = true;
+  public static final String DFS_ROUTER_FEDERATION_RENAME_MAP =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "map";
+  public static final String DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "bandwidth";
+  public static final String DFS_ROUTER_FEDERATION_RENAME_DELAY =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "delay";
+  public static final long DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT = 1000;
+  public static final String DFS_ROUTER_FEDERATION_RENAME_DIFF =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "diff";
+  public static final int DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT = 0;
+  public static final String DFS_ROUTER_FEDERATION_RENAME_TRASH =
+      DFS_ROUTER_FEDERATION_RENAME_PREFIX + "trash";
+  public static final String DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT =
+      "trash";
 }

+ 17 - 12
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -126,6 +126,7 @@ public class RouterClientProtocol implements ClientProtocol {
 
   private final RouterRpcServer rpcServer;
   private final RouterRpcClient rpcClient;
+  private final RouterFederationRename rbfRename;
   private final FileSubclusterResolver subclusterResolver;
   private final ActiveNamenodeResolver namenodeResolver;
 
@@ -191,6 +192,7 @@ public class RouterClientProtocol implements ClientProtocol {
     this.snapshotProto = new RouterSnapshot(rpcServer);
     this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
     this.securityManager = rpcServer.getRouterSecurityManager();
+    this.rbfRename = new RouterFederationRename(rpcServer, conf);
   }
 
   @Override
@@ -594,13 +596,13 @@ public class RouterClientProtocol implements ClientProtocol {
 
     final List<RemoteLocation> srcLocations =
         rpcServer.getLocationsForPath(src, true, false);
+    final List<RemoteLocation> dstLocations =
+        rpcServer.getLocationsForPath(dst, false, false);
     // srcLocations may be trimmed by getRenameDestinations()
     final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
-    RemoteParam dstParam = getRenameDestinations(locs, dst);
+    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
     if (locs.isEmpty()) {
-      throw new IOException(
-          "Rename of " + src + " to " + dst + " is not allowed," +
-              " no eligible destination in the same namespace was found.");
+      return rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
     }
     RemoteMethod method = new RemoteMethod("rename",
         new Class<?>[] {String.class, String.class},
@@ -620,13 +622,14 @@ public class RouterClientProtocol implements ClientProtocol {
 
     final List<RemoteLocation> srcLocations =
         rpcServer.getLocationsForPath(src, true, false);
+    final List<RemoteLocation> dstLocations =
+        rpcServer.getLocationsForPath(dst, false, false);
     // srcLocations may be trimmed by getRenameDestinations()
     final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
-    RemoteParam dstParam = getRenameDestinations(locs, dst);
+    RemoteParam dstParam = getRenameDestinations(locs, dstLocations);
     if (locs.isEmpty()) {
-      throw new IOException(
-          "Rename of " + src + " to " + dst + " is not allowed," +
-              " no eligible destination in the same namespace was found.");
+      rbfRename.routerFedRename(src, dst, srcLocations, dstLocations);
+      return;
     }
     RemoteMethod method = new RemoteMethod("rename2",
         new Class<?>[] {String.class, String.class, options.getClass()},
@@ -1821,11 +1824,9 @@ public class RouterClientProtocol implements ClientProtocol {
    * @throws IOException If the dst paths could not be determined.
    */
   private RemoteParam getRenameDestinations(
-      final List<RemoteLocation> srcLocations, final String dst)
-      throws IOException {
+      final List<RemoteLocation> srcLocations,
+      final List<RemoteLocation> dstLocations) throws IOException {
 
-    final List<RemoteLocation> dstLocations =
-        rpcServer.getLocationsForPath(dst, false, false);
     final Map<RemoteLocation, String> dstMap = new HashMap<>();
 
     Iterator<RemoteLocation> iterator = srcLocations.iterator();
@@ -2203,4 +2204,8 @@ public class RouterClientProtocol implements ClientProtocol {
     }
     return false;
   }
+
+  public int getRouterFederationRenameCount() {
+    return rbfRename.getRouterFederationRenameCount();
+  }
 }

+ 202 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java

@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
+import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
+import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
+import org.apache.hadoop.tools.fedbalance.TrashProcedure;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT;
+import static org.apache.hadoop.tools.fedbalance.FedBalance.DISTCP_PROCEDURE;
+import static org.apache.hadoop.tools.fedbalance.FedBalance.TRASH_PROCEDURE;
+import static org.apache.hadoop.tools.fedbalance.FedBalance.NO_MOUNT;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Rename across router based federation namespaces.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RouterFederationRename {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterFederationRename.class.getName());
+  private final RouterRpcServer rpcServer;
+  private final Configuration conf;
+  private final AtomicInteger routerRenameCounter = new AtomicInteger();
+  public enum RouterRenameOption {
+    NONE, DISTCP
+  }
+
+  public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
+    this.rpcServer = rpcServer;
+    this.conf = conf;
+  }
+
+  /**
+   * Router federation rename across namespaces.
+   *
+   * @param src the source path. There is no mount point under the src path.
+   * @param dst the dst path.
+   * @param srcLocations the remote locations of src.
+   * @param dstLocations the remote locations of dst.
+   * @throws IOException if rename fails.
+   * @return true if rename succeeds.
+   */
+  boolean routerFedRename(final String src, final String dst,
+      final List<RemoteLocation> srcLocations,
+      final List<RemoteLocation> dstLocations) throws IOException {
+    if (!rpcServer.isEnableRenameAcrossNamespace()) {
+      throw new IOException("Rename of " + src + " to " + dst
+          + " is not allowed, no eligible destination in the same namespace was"
+          + " found");
+    }
+    if (srcLocations.size() != 1 || dstLocations.size() != 1) {
+      throw new IOException("Rename of " + src + " to " + dst + " is not"
+          + " allowed. The remote location should be exactly one.");
+    }
+    RemoteLocation srcLoc = srcLocations.get(0);
+    RemoteLocation dstLoc = dstLocations.get(0);
+    // Build and submit router federation rename job.
+    BalanceJob job = buildRouterRenameJob(srcLoc.getNameserviceId(),
+        dstLoc.getNameserviceId(), srcLoc.getDest(), dstLoc.getDest());
+    BalanceProcedureScheduler scheduler = rpcServer.getFedRenameScheduler();
+    countIncrement();
+    try {
+      scheduler.submit(job);
+      LOG.info("Rename {} to {} from namespace {} to {}. JobId={}.", src, dst,
+          srcLoc.getNameserviceId(), dstLoc.getNameserviceId(), job.getId());
+      scheduler.waitUntilDone(job);
+      if (job.getError() != null) {
+        throw new IOException("Rename of " + src + " to " + dst + " failed.",
+            job.getError());
+      }
+      return true;
+    } finally {
+      countDecrement();
+    }
+  }
+
+  /**
+   * Build router federation rename job moving data from src to dst.
+   * @param srcNs the source namespace id.
+   * @param dstNs the dst namespace id.
+   * @param src the source path.
+   * @param dst the dst path.
+   */
+  private BalanceJob buildRouterRenameJob(String srcNs, String dstNs,
+      String src, String dst) throws IOException {
+    checkConfiguration(conf);
+    Path srcPath = new Path("hdfs://" + srcNs + src);
+    Path dstPath = new Path("hdfs://" + dstNs + dst);
+    boolean forceCloseOpen =
+        conf.getBoolean(DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE,
+            DFS_ROUTER_FEDERATION_RENAME_FORCE_CLOSE_OPEN_FILE_DEFAULT);
+    int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1);
+    int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1);
+    long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY,
+        DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT);
+    int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF,
+        DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT);
+    String trashPolicy = conf.get(DFS_ROUTER_FEDERATION_RENAME_TRASH,
+        DFS_ROUTER_FEDERATION_RENAME_TRASH_DEFAULT);
+    FedBalanceConfigs.TrashOption trashOpt =
+        FedBalanceConfigs.TrashOption.valueOf(trashPolicy.toUpperCase());
+    // Construct job context.
+    FedBalanceContext context =
+        new FedBalanceContext.Builder(srcPath, dstPath, NO_MOUNT, conf)
+            .setForceCloseOpenFiles(forceCloseOpen)
+            .setUseMountReadOnly(true)
+            .setMapNum(map)
+            .setBandwidthLimit(bandwidth)
+            .setTrash(trashOpt)
+            .setDelayDuration(delay)
+            .setDiffThreshold(diff)
+            .build();
+
+    LOG.info(context.toString());
+    // Construct the balance job.
+    BalanceJob.Builder<BalanceProcedure> builder = new BalanceJob.Builder<>();
+    DistCpProcedure dcp =
+        new DistCpProcedure(DISTCP_PROCEDURE, null, delay, context);
+    builder.nextProcedure(dcp);
+    TrashProcedure tp =
+        new TrashProcedure(TRASH_PROCEDURE, null, delay, context);
+    builder.nextProcedure(tp);
+    return builder.build();
+  }
+
+  public int getRouterFederationRenameCount() {
+    return routerRenameCounter.get();
+  }
+
+  void countIncrement() {
+    routerRenameCounter.incrementAndGet();
+  }
+
+  void countDecrement() {
+    routerRenameCounter.decrementAndGet();
+  }
+
+  static void checkConfiguration(Configuration conf) throws IOException {
+    int map = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_MAP, -1);
+    int bandwidth = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, -1);
+    long delay = conf.getLong(DFS_ROUTER_FEDERATION_RENAME_DELAY,
+        DFS_ROUTER_FEDERATION_RENAME_DELAY_DEFAULT);
+    int diff = conf.getInt(DFS_ROUTER_FEDERATION_RENAME_DIFF,
+        DFS_ROUTER_FEDERATION_RENAME_DIFF_DEFAULT);
+    if (map < 0) {
+      throw new IOException("map=" + map + " is negative. Please check "
+          + DFS_ROUTER_FEDERATION_RENAME_MAP);
+    } else if (bandwidth < 0) {
+      throw new IOException(
+          "bandwidth=" + bandwidth + " is negative. Please check "
+              + DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH);
+    } else if (delay < 0) {
+      throw new IOException("delay=" + delay + " is negative. Please check "
+          + DFS_ROUTER_FEDERATION_RENAME_DELAY);
+    } else if (diff < 0) {
+      throw new IOException("diff=" + diff + " is negative. Please check "
+          + DFS_ROUTER_FEDERATION_RENAME_DIFF);
+    }
+  }
+}

+ 86 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -28,12 +28,18 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -50,6 +56,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -165,6 +173,7 @@ import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSi
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
 import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
@@ -238,6 +247,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   /** DN type -> full DN report. */
   private final LoadingCache<DatanodeReportType, DatanodeInfo[]> dnCache;
 
+  /** Specify the option of router federation rename. */
+  private RouterRenameOption routerRenameOption;
+  /** Schedule the router federation rename jobs. */
+  private BalanceProcedureScheduler fedRenameScheduler;
   /**
    * Construct a router RPC server.
    *
@@ -397,6 +410,57 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
                 .forEach((key) -> this.dnCache.refresh(key)),
             0,
             dnCacheExpire, TimeUnit.MILLISECONDS);
+    initRouterFedRename();
+  }
+
+  /**
+   * Init the router federation rename environment. Each router has its own
+   * journal path.
+   * In HA mode the journal path is:
+   *   JOURNAL_BASE/nsId/namenodeId
+   * e.g.
+   *   /journal/router-namespace/host0
+   * In non-ha mode the journal path is based on ip and port:
+   *   JOURNAL_BASE/host_port
+   * e.g.
+   *   /journal/0.0.0.0_8888
+   */
+  private void initRouterFedRename() throws IOException {
+    routerRenameOption = RouterRenameOption.valueOf(
+        conf.get(DFS_ROUTER_FEDERATION_RENAME_OPTION,
+            DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT).toUpperCase());
+    switch (routerRenameOption) {
+    case DISTCP:
+      RouterFederationRename.checkConfiguration(conf);
+      Configuration sConf = new Configuration(conf);
+      URI journalUri;
+      try {
+        journalUri = new URI(sConf.get(SCHEDULER_JOURNAL_URI));
+      } catch (URISyntaxException e) {
+        throw new IOException("Bad journal uri. Please check configuration for "
+            + SCHEDULER_JOURNAL_URI);
+      }
+      Path child;
+      String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+      String namenodeId = HAUtil.getNameNodeId(conf, nsId);
+      InetSocketAddress listenAddress = this.rpcServer.getListenerAddress();
+      if (nsId == null || namenodeId == null) {
+        child = new Path(
+            listenAddress.getHostName() + "_" + listenAddress.getPort());
+      } else {
+        child = new Path(nsId, namenodeId);
+      }
+      String routerJournal = new Path(journalUri.toString(), child).toString();
+      sConf.set(SCHEDULER_JOURNAL_URI, routerJournal);
+      fedRenameScheduler = new BalanceProcedureScheduler(sConf);
+      fedRenameScheduler.init(true);
+      break;
+    case NONE:
+      fedRenameScheduler = null;
+      break;
+    default:
+      break;
+    }
   }
 
   @Override
@@ -432,9 +496,20 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     if (securityManager != null) {
       this.securityManager.stop();
     }
+    if (this.fedRenameScheduler != null) {
+      fedRenameScheduler.shutDown();
+    }
     super.serviceStop();
   }
 
+  boolean isEnableRenameAcrossNamespace() {
+    return routerRenameOption != RouterRenameOption.NONE;
+  }
+
+  BalanceProcedureScheduler getFedRenameScheduler() {
+    return this.fedRenameScheduler;
+  }
+
   /**
    * Get the RPC security manager.
    *
@@ -1889,6 +1964,17 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return routerProto.getGroupsForUser(user);
   }
 
+  public int getRouterFederationRenameCount() {
+    return clientProto.getRouterFederationRenameCount();
+  }
+
+  public int getSchedulerJobCount() {
+    if (fedRenameScheduler == null) {
+      return 0;
+    }
+    return fedRenameScheduler.getAllJobs().size();
+  }
+
   /**
    * Deals with loading datanode report into the cache and refresh.
    */

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

@@ -702,4 +702,63 @@
       concurrent calls.
     </description>
   </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.bandwidth</name>
+    <value></value>
+    <description>
+      Specify bandwidth per map in MB.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.map</name>
+    <value></value>
+    <description>
+      Max number of concurrent maps to use for copy.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.delay</name>
+    <value>1000</value>
+    <description>
+      Specify the delayed duration(millie seconds) when the job needs to retry.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.diff</name>
+    <value>0</value>
+    <description>
+      Specify the threshold of the diff entries that used in incremental copy
+      stage.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.option</name>
+    <value>NONE</value>
+    <description>
+      Specify the action when rename across namespaces. The option can be NONE
+      and DISTCP.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.force.close.open.file</name>
+    <value>true</value>
+    <description>
+      Force close all open files when there is no diff in the DIFF_DISTCP stage.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.federation.rename.trash</name>
+    <value>trash</value>
+    <description>
+      This options has 3 values: trash (move the source path to trash), delete
+      (delete the source path directly) and skip (skip both trash and deletion).
+    </description>
+  </property>
 </configuration>

+ 16 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md

@@ -509,4 +509,19 @@ Metrics
 -------
 
 The Router and State Store statistics are exposed in metrics/JMX. These info will be very useful for monitoring.
-More metrics info can see [RBF Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RBFMetrics), [Router RPC Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RouterRPCMetrics) and [State Store Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#StateStoreMetrics).
+More metrics info can see [RBF Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RBFMetrics), [Router RPC Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#RouterRPCMetrics) and [State Store Metrics](../../hadoop-project-dist/hadoop-common/Metrics.html#StateStoreMetrics).
+
+Router Federation Rename
+-------
+
+Enable Router to rename across namespaces. Currently the router federation rename is implemented by distcp. We must set the rpc timeout high enough so it won't timeout.
+
+| Property | Default | Description|
+|:---- |:---- |:---- |
+| dfs.federation.router.federation.rename.option | NONE | Specify the action when rename across namespaces. The option can be NONE(reject rename across namespaces) and DISTCP(rename across namespaces with distcp). |
+| dfs.federation.router.federation.rename.force.close.open.file | true | Force close all open files when there is no diff in the DIFF_DISTCP stage.|
+| dfs.federation.router.federation.rename.map |  | Max number of concurrent maps to use for copy.|
+| dfs.federation.router.federation.rename.bandwidth |  | Specify bandwidth per map in MB.|
+| dfs.federation.router.federation.rename.delay | 1000 | Specify the delayed duration(millie seconds) when the job needs to retry.|
+| dfs.federation.router.federation.rename.diff | 0 | Specify the threshold of the diff entries that used in incremental copy stage.|
+| dfs.federation.router.federation.rename.trash | trash | This options has 3 values: trash (move the source path to trash), delete (delete the source path directly) and skip (skip both trash and deletion).|

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java

@@ -17,11 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.federation;
 
+import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
+
 /**
  * Constructs a router configuration with individual features enabled/disabled.
  */
@@ -38,7 +44,9 @@ public class RouterConfigBuilder {
   private boolean enableMetrics = false;
   private boolean enableQuota = false;
   private boolean enableSafemode = false;
+  private RouterRenameOption routerRenameOption = RouterRenameOption.NONE;
   private boolean enableCacheRefresh;
+  private Map<String, String> innerMap = new HashMap<>();
 
   public RouterConfigBuilder(Configuration configuration) {
     this.conf = configuration;
@@ -95,6 +103,11 @@ public class RouterConfigBuilder {
     return this;
   }
 
+  public RouterConfigBuilder routerRenameOption(RouterRenameOption option) {
+    this.routerRenameOption = option;
+    return this;
+  }
+
   public RouterConfigBuilder quota(boolean enable) {
     this.enableQuota = enable;
     return this;
@@ -138,6 +151,10 @@ public class RouterConfigBuilder {
     return this.metrics(true);
   }
 
+  public RouterConfigBuilder routerRenameOption() {
+    return this.routerRenameOption(RouterRenameOption.DISTCP);
+  }
+
   public RouterConfigBuilder quota() {
     return this.quota(true);
   }
@@ -150,6 +167,13 @@ public class RouterConfigBuilder {
     return this.refreshCache(true);
   }
 
+  public RouterConfigBuilder set(String key, String value) {
+    if (key != null && value != null) {
+      innerMap.put(key, value);
+    }
+    return this;
+  }
+
   public Configuration build() {
     conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE,
         this.enableStateStore);
@@ -183,6 +207,10 @@ public class RouterConfigBuilder {
         this.enableSafemode);
     conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
         this.enableCacheRefresh);
+    conf.set(DFS_ROUTER_FEDERATION_RENAME_OPTION, routerRenameOption.name());
+    for (Map.Entry<String, String> kv : innerMap.entrySet()) {
+      conf.set(kv.getKey(), kv.getValue());
+    }
     return conf;
   }
 }

+ 455 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java

@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Basic tests of router federation rename. Rename across namespaces.
+ */
+public class TestRouterFederationRename {
+
+  private static final int NUM_SUBCLUSTERS = 2;
+  private static final int NUM_DNS = 6;
+
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+
+  /** Random Router for this federated cluster. */
+  private RouterContext router;
+
+  /** Random nameservice in the federated cluster.  */
+  private String ns;
+  /** Filesystem interface to the Router. */
+  private FileSystem routerFS;
+  /** Filesystem interface to the Namenode. */
+  private FileSystem nnFS;
+  /** File in the Namenode. */
+  private String nnFile;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    Configuration namenodeConf = new Configuration();
+    namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
+        true);
+    cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
+    cluster.setNumDatanodesPerNameservice(NUM_DNS);
+    cluster.addNamenodeOverrides(namenodeConf);
+    cluster.setIndependentDNs();
+
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
+    cluster.addNamenodeOverrides(conf);
+    // Start NNs and DNs and wait until ready.
+    cluster.startCluster();
+
+    // Start routers, enable router federation rename.
+    String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
+        .getClientNamenodeAddress() + "/journal";
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .routerRenameOption()
+        .set(SCHEDULER_JOURNAL_URI, journal)
+        .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
+        .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
+        .build();
+    // We decrease the DN cache times to make the test faster.
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+
+    // We decrease the DN heartbeat expire interval to make them dead faster
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatInterval(1);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatInterval(1);
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(3000);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(3000);
+    DistCpProcedure.enableForTest();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+    DistCpProcedure.disableForTest();
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+
+    // Create mock locations
+    cluster.installMockLocations();
+
+    // Delete all files via the NNs and verify
+    cluster.deleteAllFiles();
+
+    // Create test fixtures on NN
+    cluster.createTestDirectoriesNamenode();
+
+    // Wait to ensure NN has fully created its test directories
+    Thread.sleep(100);
+
+    // Random router for this test
+    RouterContext rndRouter = cluster.getRandomRouter();
+    this.setRouter(rndRouter);
+
+    // Create a mount that points to 2 dirs in the same ns:
+    // /same
+    //   ns0 -> /
+    //   ns0 -> /target-ns0
+    for (RouterContext rc : cluster.getRouters()) {
+      Router r = rc.getRouter();
+      MockResolver resolver = (MockResolver) r.getSubclusterResolver();
+      List<String> nss = cluster.getNameservices();
+      String ns0 = nss.get(0);
+      resolver.addLocation("/same", ns0, "/");
+      resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
+    }
+
+    // Pick a namenode for this test
+    String ns0 = cluster.getNameservices().get(0);
+    this.setNs(ns0);
+    this.setNamenode(cluster.getNamenode(ns0, null));
+
+    // Create a test file on the NN
+    Random rnd = new Random();
+    String randomFile = "testfile-" + rnd.nextInt();
+    this.nnFile =
+        cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
+
+    createFile(nnFS, nnFile, 32);
+    verifyFileExists(nnFS, nnFile);
+  }
+
+  protected void createDir(FileSystem fs, String dir) throws IOException {
+    fs.mkdirs(new Path(dir));
+    String file = dir + "/file";
+    createFile(fs, file, 32);
+    verifyFileExists(fs, dir);
+    verifyFileExists(fs, file);
+  }
+
+  protected void testRenameDir(RouterContext testRouter, String path,
+      String renamedPath, boolean exceptionExpected, Callable<Object> call)
+      throws IOException {
+    createDir(testRouter.getFileSystem(), path);
+    // rename
+    boolean exceptionThrown = false;
+    try {
+      call.call();
+      assertFalse(verifyFileExists(testRouter.getFileSystem(), path));
+      assertTrue(
+          verifyFileExists(testRouter.getFileSystem(), renamedPath + "/file"));
+    } catch (Exception ex) {
+      exceptionThrown = true;
+      assertTrue(verifyFileExists(testRouter.getFileSystem(), path + "/file"));
+      assertFalse(verifyFileExists(testRouter.getFileSystem(), renamedPath));
+    } finally {
+      FileContext fileContext = testRouter.getFileContext();
+      fileContext.delete(new Path(path), true);
+      fileContext.delete(new Path(renamedPath), true);
+    }
+    if (exceptionExpected) {
+      // Error was expected.
+      assertTrue(exceptionThrown);
+    } else {
+      // No error was expected.
+      assertFalse(exceptionThrown);
+    }
+  }
+
+  protected void setRouter(RouterContext r) throws IOException {
+    this.router = r;
+    this.routerFS = r.getFileSystem();
+  }
+
+  protected void setNs(String nameservice) {
+    this.ns = nameservice;
+  }
+
+  protected void setNamenode(NamenodeContext nn) throws IOException {
+    this.nnFS = nn.getFileSystem();
+  }
+
+  protected FileSystem getRouterFileSystem() {
+    return this.routerFS;
+  }
+
+  @Test
+  public void testSuccessfulRbfRename() throws Exception {
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Test successfully rename a dir to a destination that is in a different
+    // namespace.
+    String dir =
+        cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
+    String renamedDir =
+        cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
+    testRenameDir(router, dir, renamedDir, false, () -> {
+      DFSClient client = router.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      clientProtocol.rename(dir, renamedDir);
+      return null;
+    });
+    testRenameDir(router, dir, renamedDir, false, () -> {
+      DFSClient client = router.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      clientProtocol.rename2(dir, renamedDir);
+      return null;
+    });
+  }
+
+  @Test
+  public void testRbfRenameFile() throws Exception {
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Test router federation rename a file.
+    String file =
+        cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
+    String renamedFile =
+        cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
+    createFile(routerFS, file, 32);
+    getRouterFileSystem().mkdirs(new Path(renamedFile));
+    LambdaTestUtils.intercept(RemoteException.class, "should be a directory",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(file, renamedFile);
+          return null;
+        });
+    LambdaTestUtils.intercept(RemoteException.class, "should be a directory",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename2(file, renamedFile);
+          return null;
+        });
+    getRouterFileSystem().delete(new Path(file), true);
+    getRouterFileSystem().delete(new Path(renamedFile), true);
+  }
+
+  @Test
+  public void testRbfRenameWhenDstAlreadyExists() throws Exception {
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Test router federation rename a path to a destination that is in a
+    // different namespace and already exists.
+    String dir =
+        cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
+    String renamedDir =
+        cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
+    createDir(routerFS, dir);
+    getRouterFileSystem().mkdirs(new Path(renamedDir));
+    LambdaTestUtils.intercept(RemoteException.class, "already exists",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(dir, renamedDir);
+          return null;
+        });
+    LambdaTestUtils.intercept(RemoteException.class, "already exists",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename2(dir, renamedDir);
+          return null;
+        });
+    getRouterFileSystem().delete(new Path(dir), true);
+    getRouterFileSystem().delete(new Path(renamedDir), true);
+  }
+
+  @Test
+  public void testRbfRenameWhenSrcNotExists() throws Exception {
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Test router federation rename un-existed path.
+    String dir =
+        cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
+    String renamedDir =
+        cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
+    LambdaTestUtils.intercept(RemoteException.class, "File does not exist",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(dir, renamedDir);
+          return null;
+        });
+    LambdaTestUtils.intercept(RemoteException.class, "File does not exist",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename2(dir, renamedDir);
+          return null;
+        });
+  }
+
+  @Test
+  public void testRbfRenameOfMountPoint() throws Exception {
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Test router federation rename a mount point.
+    String dir = cluster.getFederatedPathForNS(ns0);
+    String renamedDir = cluster.getFederatedPathForNS(ns1);
+    LambdaTestUtils.intercept(RemoteException.class, "is a mount point",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(dir, renamedDir);
+          return null;
+        });
+    LambdaTestUtils.intercept(RemoteException.class, "is a mount point",
+        "Expect RemoteException.", () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename2(dir, renamedDir);
+          return null;
+        });
+  }
+
+  @Test
+  public void testRbfRenameWithMultiDestination() throws Exception {
+    List<String> nss = cluster.getNameservices();
+    String ns1 = nss.get(1);
+    FileSystem rfs = getRouterFileSystem();
+
+    // Test router federation rename a path with multi-destination.
+    String dir = "/same/" + getMethodName();
+    String renamedDir = cluster.getFederatedTestDirectoryForNS(ns1) + "/"
+        + getMethodName();
+    createDir(rfs, dir);
+    getRouterFileSystem().mkdirs(new Path(renamedDir));
+    LambdaTestUtils.intercept(RemoteException.class,
+        "The remote location should be exactly one", "Expect RemoteException.",
+        () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(dir, renamedDir);
+          return null;
+        });
+    LambdaTestUtils.intercept(RemoteException.class,
+        "The remote location should be exactly one", "Expect RemoteException.",
+        () -> {
+          DFSClient client = router.getClient();
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename2(dir, renamedDir);
+          return null;
+        });
+    getRouterFileSystem().delete(new Path(dir), true);
+    getRouterFileSystem().delete(new Path(renamedDir), true);
+  }
+
+  @Test(timeout = 10000)
+  public void testCounter() throws Exception {
+    final RouterRpcServer rpcServer = router.getRouter().getRpcServer();
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+    RouterFederationRename rbfRename =
+        Mockito.spy(new RouterFederationRename(rpcServer, router.getConf()));
+    String path = "/src";
+    createDir(cluster.getCluster().getFileSystem(0), path);
+    // Watch the scheduler job count.
+    int expectedSchedulerCount = rpcServer.getSchedulerJobCount() + 1;
+    AtomicInteger maxSchedulerCount = new AtomicInteger();
+    AtomicBoolean watch = new AtomicBoolean(true);
+    Thread watcher = new Thread(() -> {
+      while (watch.get()) {
+        int schedulerCount = rpcServer.getSchedulerJobCount();
+        if (schedulerCount > maxSchedulerCount.get()) {
+          maxSchedulerCount.set(schedulerCount);
+        }
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+        }
+      }
+    });
+    watcher.start();
+    // Trigger rename.
+    rbfRename.routerFedRename("/src", "/dst",
+        Arrays.asList(new RemoteLocation(ns0, path, null)),
+        Arrays.asList(new RemoteLocation(ns1, path, null)));
+    // Verify count.
+    verify(rbfRename).countIncrement();
+    verify(rbfRename).countDecrement();
+    watch.set(false);
+    watcher.interrupt();
+    watcher.join();
+    assertEquals(expectedSchedulerCount, maxSchedulerCount.get());
+    // Clean up.
+    assertFalse(cluster.getCluster().getFileSystem(0).exists(new Path(path)));
+    assertTrue(
+        cluster.getCluster().getFileSystem(1).delete(new Path(path), true));
+  }
+}

+ 8 - 0
hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/DistCpProcedure.java

@@ -112,6 +112,14 @@ public class DistCpProcedure extends BalanceProcedure {
   @VisibleForTesting
   static boolean enabledForTest = false;
 
+  public static void enableForTest() {
+    enabledForTest = true;
+  }
+
+  public static void disableForTest() {
+    enabledForTest = false;
+  }
+
   public DistCpProcedure() {
   }
 

+ 5 - 5
hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalance.java

@@ -59,9 +59,9 @@ public class FedBalance extends Configured implements Tool {
       LoggerFactory.getLogger(FedBalance.class);
   private static final String SUBMIT_COMMAND = "submit";
   private static final String CONTINUE_COMMAND = "continue";
-  private static final String NO_MOUNT = "no-mount";
-  private static final String DISTCP_PROCEDURE = "distcp-procedure";
-  private static final String TRASH_PROCEDURE = "trash-procedure";
+  public static final String NO_MOUNT = "no-mount";
+  public static final String DISTCP_PROCEDURE = "distcp-procedure";
+  public static final String TRASH_PROCEDURE = "trash-procedure";
 
   public static final String FED_BALANCE_DEFAULT_XML =
       "hdfs-fedbalance-default.xml";
@@ -70,7 +70,7 @@ public class FedBalance extends Configured implements Tool {
   /**
    * This class helps building the balance job.
    */
-  private class Builder {
+  private final class Builder {
     /* Force close all open files while there is no diff. */
     private boolean forceCloseOpen = false;
     /* Max number of concurrent maps to use for copy. */
@@ -88,7 +88,7 @@ public class FedBalance extends Configured implements Tool {
     /* The dst input. This specifies the dst path. */
     private final String inputDst;
 
-    Builder(String inputSrc, String inputDst) {
+    private Builder(String inputSrc, String inputDst) {
       this.inputSrc = inputSrc;
       this.inputDst = inputDst;
     }

+ 25 - 12
hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/FedBalanceContext.java

@@ -176,21 +176,34 @@ public class FedBalanceContext implements Writable {
 
   @Override
   public String toString() {
-    StringBuilder builder = new StringBuilder("FedBalance context:");
-    builder.append(" src=").append(src);
-    builder.append(", dst=").append(dst);
+    StringBuilder builder = new StringBuilder();
+    builder.append("Move ").append(src).append(" to ").append(dst);
     if (useMountReadOnly) {
-      builder.append(", router-mode=true");
-      builder.append(", mount-point=").append(mount);
+      builder.append(" using router mode, mount point=").append(mount)
+          .append(".");
     } else {
-      builder.append(", router-mode=false");
+      builder.append(" using normal federation mode.");
     }
-    builder.append(", forceCloseOpenFiles=").append(forceCloseOpenFiles);
-    builder.append(", trash=").append(trashOpt.name());
-    builder.append(", map=").append(mapNum);
-    builder.append(", bandwidth=").append(bandwidthLimit);
-    builder.append(", delayDuration=").append(delayDuration);
-    builder.append(", diffThreshold=").append(diffThreshold);
+    builder.append(" Submit distcp job with map=").append(mapNum)
+        .append(" and bandwidth=").append(bandwidthLimit).append(".");
+    builder.append(" When the diff count is no greater than ")
+        .append(diffThreshold);
+    if (forceCloseOpenFiles) {
+      builder.append(", force close all open files.");
+    } else {
+      builder.append(", wait until there is no open files.");
+    }
+    switch (trashOpt) {
+    case DELETE:
+      builder.append(" Delete the src after the job is complete.");
+      break;
+    case TRASH:
+      builder.append(" Move the src to trash after the job is complete.");
+      break;
+    default:
+      break;
+    }
+    builder.append(" Delay duration is ").append(delayDuration).append("ms.");
     return builder.toString();
   }
 

+ 1 - 0
hadoop-tools/hadoop-federation-balance/src/main/java/org/apache/hadoop/tools/fedbalance/procedure/BalanceProcedureScheduler.java

@@ -292,6 +292,7 @@ public class BalanceProcedureScheduler {
     for (BalanceJob job : jobs) {
       recoverQueue.add(job);
       jobSet.put(job, job);
+      LOG.info("Recover federation balance job {}.", job);
     }
   }
 

+ 2 - 2
hadoop-tools/hadoop-federation-balance/src/test/java/org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.java

@@ -76,7 +76,7 @@ public class TestDistCpProcedure {
 
   @BeforeClass
   public static void beforeClass() throws IOException {
-    DistCpProcedure.enabledForTest = true;
+    DistCpProcedure.enableForTest();
     conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -92,7 +92,7 @@ public class TestDistCpProcedure {
 
   @AfterClass
   public static void afterClass() {
-    DistCpProcedure.enabledForTest = false;
+    DistCpProcedure.disableForTest();
     if (cluster != null) {
       cluster.shutdown();
     }