Browse Source

HDFS-14343. RBF: Fix renaming folders spread across multiple subclusters. Contributed by Ayush Saxena.

Giovanni Matteo Fumarola 6 years ago
parent
commit
6c686253e9

+ 44 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -88,6 +89,8 @@ import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collection;
@@ -466,8 +469,12 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("rename",
     RemoteMethod method = new RemoteMethod("rename",
         new Class<?>[] {String.class, String.class},
         new Class<?>[] {String.class, String.class},
         new RemoteParam(), dstParam);
         new RemoteParam(), dstParam);
-    return rpcClient.invokeSequential(locs, method, Boolean.class,
-        Boolean.TRUE);
+    if (isMultiDestDirectory(src)) {
+      return rpcClient.invokeAll(locs, method);
+    } else {
+      return rpcClient.invokeSequential(locs, method, Boolean.class,
+          Boolean.TRUE);
+    }
   }
   }
 
 
   @Override
   @Override
@@ -488,7 +495,11 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("rename2",
     RemoteMethod method = new RemoteMethod("rename2",
         new Class<?>[] {String.class, String.class, options.getClass()},
         new Class<?>[] {String.class, String.class, options.getClass()},
         new RemoteParam(), dstParam, options);
         new RemoteParam(), dstParam, options);
-    rpcClient.invokeSequential(locs, method, null, null);
+    if (isMultiDestDirectory(src)) {
+      rpcClient.invokeConcurrent(locs, method);
+    } else {
+      rpcClient.invokeSequential(locs, method, null, null);
+    }
   }
   }
 
 
   @Override
   @Override
@@ -1857,4 +1868,34 @@ public class RouterClientProtocol implements ClientProtocol {
     }
     }
     return modTime;
     return modTime;
   }
   }
+
+  /**
+   * Checks if the path is a directory and is supposed to be present in all
+   * subclusters.
+   * @param src the source path
+   * @return true if the path is directory and is supposed to be present in all
+   *         subclusters else false in all other scenarios.
+   * @throws IOException if unable to get the file status.
+   */
+  @VisibleForTesting
+  boolean isMultiDestDirectory(String src) throws IOException {
+    try {
+      if (rpcServer.isPathAll(src)) {
+        List<RemoteLocation> locations;
+        locations = rpcServer.getLocationsForPath(src, false);
+        RemoteMethod method = new RemoteMethod("getFileInfo",
+            new Class<?>[] {String.class}, new RemoteParam());
+        HdfsFileStatus fileStatus = rpcClient.invokeSequential(locations,
+            method, HdfsFileStatus.class, null);
+        if (fileStatus != null) {
+          return fileStatus.isDirectory();
+        } else {
+          LOG.debug("The destination {} doesn't exist.", src);
+        }
+      }
+    } catch (UnresolvedPathException e) {
+      LOG.debug("The destination {} is a symlink.", src);
+    }
+    return false;
+  }
 }
 }

+ 112 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java

@@ -36,6 +36,7 @@ import java.util.TreeSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -427,6 +428,117 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
         null, null, Arrays.asList("ns0", "ns1"));
         null, null, Arrays.asList("ns0", "ns1"));
   }
   }
 
 
+  @Test
+  public void testIsMultiDestDir() throws Exception {
+    RouterClientProtocol client =
+        routerContext.getRouter().getRpcServer().getClientProtocolModule();
+    setupOrderMountPath(DestinationOrder.HASH_ALL);
+    // Should be true only for directory and false for all other cases.
+    assertTrue(client.isMultiDestDirectory("/mount/dir"));
+    assertFalse(client.isMultiDestDirectory("/mount/nodir"));
+    assertFalse(client.isMultiDestDirectory("/mount/dir/file"));
+    routerFs.createSymlink(new Path("/mount/dir/file"),
+        new Path("/mount/dir/link"), true);
+    assertFalse(client.isMultiDestDirectory("/mount/dir/link"));
+    routerFs.createSymlink(new Path("/mount/dir/dir"),
+        new Path("/mount/dir/linkDir"), true);
+    assertFalse(client.isMultiDestDirectory("/mount/dir/linkDir"));
+    resetTestEnvironment();
+    // Test single directory destination. Should be false for the directory.
+    setupOrderMountPath(DestinationOrder.HASH);
+    assertFalse(client.isMultiDestDirectory("/mount/dir"));
+  }
+
+  @Test
+  public void testRenameMultipleDestDirectories() throws Exception {
+    // Test renaming directories using rename API.
+    verifyRenameOnMultiDestDirectories(DestinationOrder.HASH_ALL, false);
+    resetTestEnvironment();
+    verifyRenameOnMultiDestDirectories(DestinationOrder.RANDOM, false);
+    resetTestEnvironment();
+    verifyRenameOnMultiDestDirectories(DestinationOrder.SPACE, false);
+    resetTestEnvironment();
+    // Test renaming directories using rename2 API.
+    verifyRenameOnMultiDestDirectories(DestinationOrder.HASH_ALL, true);
+    resetTestEnvironment();
+    verifyRenameOnMultiDestDirectories(DestinationOrder.RANDOM, true);
+    resetTestEnvironment();
+    verifyRenameOnMultiDestDirectories(DestinationOrder.SPACE, true);
+  }
+
+  /**
+   * Test to verify rename operation on directories in case of multiple
+   * destinations.
+   * @param order order to be followed by the mount entry.
+   * @param isRename2 true if the verification is to be done using rename2(..)
+   *          method.
+   * @throws Exception on account of any exception during test execution.
+   */
+  private void verifyRenameOnMultiDestDirectories(DestinationOrder order,
+      boolean isRename2) throws Exception {
+    setupOrderMountPath(order);
+    Path src = new Path("/mount/dir/dir");
+    Path nnSrc = new Path("/tmp/dir/dir");
+    Path dst = new Path("/mount/dir/subdir");
+    Path nnDst = new Path("/tmp/dir/subdir");
+    Path fileSrc = new Path("/mount/dir/dir/file");
+    Path nnFileSrc = new Path("/tmp/dir/dir/file");
+    Path fileDst = new Path("/mount/dir/subdir/file");
+    Path nnFileDst = new Path("/tmp/dir/subdir/file");
+    DFSTestUtil.createFile(routerFs, fileSrc, 100L, (short) 1, 1024L);
+    if (isRename2) {
+      routerFs.rename(src, dst, Rename.NONE);
+    } else {
+      assertTrue(routerFs.rename(src, dst));
+    }
+    assertTrue(nnFs0.exists(nnDst));
+    assertTrue(nnFs1.exists(nnDst));
+    assertFalse(nnFs0.exists(nnSrc));
+    assertFalse(nnFs1.exists(nnSrc));
+    assertFalse(routerFs.exists(fileSrc));
+    assertTrue(routerFs.exists(fileDst));
+    assertTrue(nnFs0.exists(nnFileDst) || nnFs1.exists(nnFileDst));
+    assertFalse(nnFs0.exists(nnFileSrc) || nnFs1.exists(nnFileSrc));
+
+    // Verify rename file.
+    Path fileRenamed = new Path("/mount/dir/subdir/renamedFile");
+    Path nnFileRenamed = new Path("/tmp/dir/subdir/renamedFile");
+    if (isRename2) {
+      routerFs.rename(fileDst, fileRenamed, Rename.NONE);
+    } else {
+      assertTrue(routerFs.rename(fileDst, fileRenamed));
+    }
+    assertTrue(routerFs.exists(fileRenamed));
+    assertFalse(routerFs.exists(fileDst));
+    assertTrue(nnFs0.exists(nnFileRenamed) || nnFs1.exists(nnFileRenamed));
+    assertFalse(nnFs0.exists(nnFileDst) || nnFs1.exists(nnFileDst));
+
+    // Verify rename when one source directory is not present.
+    Path dst1 = new Path("/mount/dir/renameddir");
+    Path nnDst1 = new Path("/tmp/dir/renameddir");
+    nnFs1.delete(nnDst, true);
+    if (isRename2) {
+      routerFs.rename(dst, dst1, Rename.NONE);
+    } else {
+      assertTrue(routerFs.rename(dst, dst1));
+    }
+    assertTrue(nnFs0.exists(nnDst1));
+    assertFalse(nnFs0.exists(nnDst));
+
+    // Verify rename when one destination directory is already present.
+    Path src1 = new Path("/mount/dir");
+    Path dst2 = new Path("/mount/OneDest");
+    Path nnDst2 = new Path("/tmp/OneDest");
+    nnFs0.mkdirs(nnDst2);
+    if (isRename2) {
+      routerFs.rename(src1, dst2, Rename.NONE);
+    } else {
+      assertTrue(routerFs.rename(src1, dst2));
+    }
+    assertTrue(nnFs0.exists(nnDst2));
+    assertTrue(nnFs1.exists(nnDst2));
+  }
+
   /**
   /**
    * Generic test for getting the destination subcluster.
    * Generic test for getting the destination subcluster.
    * @param order DestinationOrder of the mount point.
    * @param order DestinationOrder of the mount point.