Ver código fonte

HDFS-13916. Distcp SnapshotDiff to support WebHDFS. Contributed by Xun REN.

Signed-off-by: Masatake Iwasaki <iwasakims@apache.org>
Masatake Iwasaki 3 anos atrás
pai
commit
3788fe52da

+ 48 - 26
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.tools.CopyListing.InvalidInputException;
 
 import java.io.FileNotFoundException;
@@ -40,7 +41,8 @@ import java.util.HashSet;
 /**
  * This class provides the basic functionality to sync two FileSystems based on
  * the snapshot diff report. More specifically, we have the following settings:
- * 1. Both the source and target FileSystem must be DistributedFileSystem
+ * 1. Both the source and target FileSystem must be DistributedFileSystem or
+ * (s)WebHdfsFileSystem
  * 2. Two snapshots (e.g., s1 and s2) have been created on the source FS.
  * The diff between these two snapshots will be copied to the target FS.
  * 3. The target has the same snapshot s1. No changes have been made on the
@@ -73,7 +75,7 @@ class DistCpSync {
   /**
    * Check if three conditions are met before sync.
    * 1. Only one source directory.
-   * 2. Both source and target file system are DFS.
+   * 2. Both source and target file system are DFS or WebHdfs.
    * 3. There is no change between from and the current status in target
    *    file system.
    *  Throw exceptions if first two aren't met, and return false to fallback to
@@ -95,17 +97,22 @@ class DistCpSync {
     final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir;
 
     // currently we require both the source and the target file system are
-    // DistributedFileSystem.
-    if (!(srcFs instanceof DistributedFileSystem) ||
-        !(tgtFs instanceof DistributedFileSystem)) {
-      throw new IllegalArgumentException("The FileSystems needs to" +
-          " be DistributedFileSystem for using snapshot-diff-based distcp");
+    // DistributedFileSystem or (S)WebHdfsFileSystem.
+    if (!(srcFs instanceof DistributedFileSystem
+            || srcFs instanceof WebHdfsFileSystem)) {
+      throw new IllegalArgumentException("Unsupported source file system: "
+          + srcFs.getScheme() + "://. " +
+          "Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
+    }
+    if (!(tgtFs instanceof DistributedFileSystem
+        || tgtFs instanceof WebHdfsFileSystem)) {
+      throw new IllegalArgumentException("Unsupported target file system: "
+          + tgtFs.getScheme() + "://. " +
+          "Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
     }
-
-    final DistributedFileSystem targetFs = (DistributedFileSystem) tgtFs;
 
     // make sure targetFS has no change between from and the current states
-    if (!checkNoChange(targetFs, targetDir)) {
+    if (!checkNoChange(tgtFs, targetDir)) {
       // set the source path using the snapshot path
       context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
           context.getToSnapshot())));
@@ -161,23 +168,22 @@ class DistCpSync {
     final Path sourceDir = sourcePaths.get(0);
     final Path targetDir = context.getTargetPath();
     final FileSystem tfs = targetDir.getFileSystem(conf);
-    final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
 
     Path tmpDir = null;
     try {
-      tmpDir = createTargetTmpDir(targetFs, targetDir);
+      tmpDir = createTargetTmpDir(tfs, targetDir);
       DiffInfo[] renameAndDeleteDiffs =
           getRenameAndDeleteDiffsForSync(targetDir);
       if (renameAndDeleteDiffs.length > 0) {
         // do the real sync work: deletion and rename
-        syncDiff(renameAndDeleteDiffs, targetFs, tmpDir);
+        syncDiff(renameAndDeleteDiffs, tfs, tmpDir);
       }
       return true;
     } catch (Exception e) {
       DistCp.LOG.warn("Failed to use snapshot diff for distcp", e);
       return false;
     } finally {
-      deleteTargetTmpDir(targetFs, tmpDir);
+      deleteTargetTmpDir(tfs, tmpDir);
       // TODO: since we have tmp directory, we can support "undo" with failures
       // set the source path using the snapshot path
       context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
@@ -195,12 +201,22 @@ class DistCpSync {
         context.getTargetPath() : context.getSourcePaths().get(0);
 
     try {
-      DistributedFileSystem fs =
-          (DistributedFileSystem) ssDir.getFileSystem(conf);
+      SnapshotDiffReport report = null;
+      FileSystem fs = ssDir.getFileSystem(conf);
       final String from = getSnapshotName(context.getFromSnapshot());
       final String to = getSnapshotName(context.getToSnapshot());
-      SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir,
-          from, to);
+      if (fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        report = dfs.getSnapshotDiffReport(ssDir, from, to);
+      } else if (fs instanceof WebHdfsFileSystem) {
+        WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs;
+        report = webHdfs.getSnapshotDiffReport(ssDir, from, to);
+      } else {
+        throw new IllegalArgumentException("Unsupported file system: " +
+            fs.getScheme() + "://. " +
+            "Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
+      }
+
       this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
       for (SnapshotDiffReport.DiffType type :
           SnapshotDiffReport.DiffType.values()) {
@@ -265,7 +281,7 @@ class DistCpSync {
     }
   }
 
-  private Path createTargetTmpDir(DistributedFileSystem targetFs,
+  private Path createTargetTmpDir(FileSystem targetFs,
                                   Path targetDir) throws IOException {
     final Path tmp = new Path(targetDir,
         DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt());
@@ -275,7 +291,7 @@ class DistCpSync {
     return tmp;
   }
 
-  private void deleteTargetTmpDir(DistributedFileSystem targetFs,
+  private void deleteTargetTmpDir(FileSystem targetFs,
                                   Path tmpDir) {
     try {
       if (tmpDir != null) {
@@ -290,11 +306,17 @@ class DistCpSync {
    * Compute the snapshot diff on the given file system. Return true if the diff
    * is empty, i.e., no changes have happened in the FS.
    */
-  private boolean checkNoChange(DistributedFileSystem fs, Path path) {
+  private boolean checkNoChange(FileSystem fs, Path path) {
     try {
       final String from = getSnapshotName(context.getFromSnapshot());
-      SnapshotDiffReport targetDiff =
-          fs.getSnapshotDiffReport(path, from, "");
+      SnapshotDiffReport targetDiff = null;
+      if (fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem)fs;
+        targetDiff = dfs.getSnapshotDiffReport(path, from, "");
+      } else {
+        WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs;
+        targetDiff = webHdfs.getSnapshotDiffReport(path, from, "");
+      }
       if (!targetDiff.getDiffList().isEmpty()) {
         DistCp.LOG.warn("The target has been modified since snapshot "
             + context.getFromSnapshot());
@@ -310,7 +332,7 @@ class DistCpSync {
   }
 
   private void syncDiff(DiffInfo[] diffs,
-      DistributedFileSystem targetFs, Path tmpDir) throws IOException {
+      FileSystem targetFs, Path tmpDir) throws IOException {
     moveToTmpDir(diffs, targetFs, tmpDir);
     moveToTarget(diffs, targetFs);
   }
@@ -320,7 +342,7 @@ class DistCpSync {
    * directory.
    */
   private void moveToTmpDir(DiffInfo[] diffs,
-      DistributedFileSystem targetFs, Path tmpDir) throws IOException {
+      FileSystem targetFs, Path tmpDir) throws IOException {
     // sort the diffs based on their source paths to make sure the files and
     // subdirs are moved before moving their parents/ancestors.
     Arrays.sort(diffs, DiffInfo.sourceComparator);
@@ -341,7 +363,7 @@ class DistCpSync {
    * from the tmp dir to the final targets.
    */
   private void moveToTarget(DiffInfo[] diffs,
-      DistributedFileSystem targetFs) throws IOException {
+      FileSystem targetFs) throws IOException {
     // sort the diffs based on their target paths to make sure the parent
     // directories are created first.
     Arrays.sort(diffs, DiffInfo.targetComparator);

+ 141 - 21
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.tools;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -27,6 +28,9 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -39,7 +43,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-
 import java.io.IOException;
 import java.io.FileWriter;
 import java.io.BufferedWriter;
@@ -48,12 +51,14 @@ import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class TestDistCpSync {
   private MiniDFSCluster cluster;
   private final Configuration conf = new HdfsConfiguration();
   private DistributedFileSystem dfs;
+  private WebHdfsFileSystem webfs;
   private DistCpContext context;
   private final Path source = new Path("/source");
   private final Path target = new Path("/target");
@@ -65,6 +70,9 @@ public class TestDistCpSync {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build();
     cluster.waitActive();
 
+    webfs = WebHdfsTestUtil.
+            getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
+
     dfs = cluster.getFileSystem();
     dfs.mkdirs(source);
     dfs.mkdirs(target);
@@ -160,6 +168,10 @@ public class TestDistCpSync {
    *         f3            f4
    */
   private void initData(Path dir) throws Exception {
+    initData(dfs, dir);
+  }
+
+  private void initData(FileSystem fs, Path dir) throws Exception {
     final Path foo = new Path(dir, "foo");
     final Path bar = new Path(dir, "bar");
     final Path d1 = new Path(foo, "d1");
@@ -169,10 +181,10 @@ public class TestDistCpSync {
     final Path f3 = new Path(d1, "f3");
     final Path f4 = new Path(d2, "f4");
 
-    DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0);
-    DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0);
-    DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0);
-    DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(fs, f1, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(fs, f2, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(fs, f3, BLOCK_SIZE, DATA_NUM, 0);
+    DFSTestUtil.createFile(fs, f4, BLOCK_SIZE, DATA_NUM, 0);
   }
 
   /**
@@ -192,7 +204,7 @@ public class TestDistCpSync {
    *                foo/             f4
    *                f1(new)
    */
-  private int changeData(Path dir) throws Exception {
+  private int changeData(FileSystem fs, Path dir) throws Exception {
     final Path foo = new Path(dir, "foo");
     final Path bar = new Path(dir, "bar");
     final Path d1 = new Path(foo, "d1");
@@ -200,21 +212,21 @@ public class TestDistCpSync {
 
     final Path bar_d1 = new Path(bar, "d1");
     int numCreatedModified = 0;
-    dfs.rename(d1, bar_d1);
+    fs.rename(d1, bar_d1);
     numCreatedModified += 1; // modify ./foo
     numCreatedModified += 1; // modify ./bar
     final Path f3 = new Path(bar_d1, "f3");
-    dfs.delete(f3, true);
+    fs.delete(f3, true);
     final Path newfoo = new Path(bar_d1, "foo");
-    dfs.rename(foo, newfoo);
+    fs.rename(foo, newfoo);
     numCreatedModified += 1; // modify ./foo/d1
     final Path f1 = new Path(newfoo, "f1");
-    dfs.delete(f1, true);
-    DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
+    fs.delete(f1, true);
+    DFSTestUtil.createFile(fs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
     numCreatedModified += 1; // create ./foo/f1
-    DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
+    DFSTestUtil.appendFile(fs, f2, (int) BLOCK_SIZE);
     numCreatedModified += 1; // modify ./bar/f2
-    dfs.rename(bar, new Path(dir, "foo"));
+    fs.rename(bar, new Path(dir, "foo"));
     return numCreatedModified;
   }
 
@@ -228,7 +240,7 @@ public class TestDistCpSync {
     enableAndCreateFirstSnapshot();
 
     // make changes under source
-    int numCreatedModified = changeData(source);
+    int numCreatedModified = changeData(dfs, source);
     dfs.createSnapshot(source, "s2");
 
     // before sync, make some further changes on source. this should not affect
@@ -295,23 +307,51 @@ public class TestDistCpSync {
     return values;
   }
 
+  /**
+   * By default, we are using DFS for both source and target.
+   * @param s source file status
+   * @param t target file status
+   * @param compareName whether will we compare the name of the files
+   * @throws Exception
+   */
   private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
-      throws Exception {
+          throws Exception {
+    verifyCopy(dfs, dfs, s, t, compareName);
+  }
+
+  /**
+   * Verify copy by using different file systems.
+   * @param sfs source file system
+   * @param tfs target file system
+   * @param s source file status
+   * @param t target file status
+   * @param compareName whether will we compare the name of the files
+   * @throws Exception
+   */
+  private void verifyCopyByFs(FileSystem sfs, FileSystem tfs,
+                              FileStatus s, FileStatus t, boolean compareName)
+          throws Exception {
+    verifyCopy(sfs, tfs, s, t, compareName);
+  }
+
+  private void verifyCopy(FileSystem sfs, FileSystem tfs,
+                          FileStatus s, FileStatus t, boolean compareName)
+          throws Exception {
     Assert.assertEquals(s.isDirectory(), t.isDirectory());
     if (compareName) {
       Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
     }
     if (!s.isDirectory()) {
       // verify the file content is the same
-      byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
-      byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
+      byte[] sbytes = DFSTestUtil.readFileBuffer(sfs, s.getPath());
+      byte[] tbytes = DFSTestUtil.readFileBuffer(tfs, t.getPath());
       Assert.assertArrayEquals(sbytes, tbytes);
     } else {
-      FileStatus[] slist = dfs.listStatus(s.getPath());
-      FileStatus[] tlist = dfs.listStatus(t.getPath());
+      FileStatus[] slist = sfs.listStatus(s.getPath());
+      FileStatus[] tlist = tfs.listStatus(t.getPath());
       Assert.assertEquals(slist.length, tlist.length);
       for (int i = 0; i < slist.length; i++) {
-        verifyCopy(slist[i], tlist[i], true);
+        verifyCopy(sfs, tfs, slist[i], tlist[i], true);
       }
     }
   }
@@ -333,7 +373,7 @@ public class TestDistCpSync {
     enableAndCreateFirstSnapshot();
 
     // make changes under source
-    changeData(source);
+    changeData(dfs, source);
 
     // do the sync
     sync();
@@ -907,4 +947,84 @@ public class TestDistCpSync {
       deleteFilterFile(filterFile);
     }
   }
+
+  /**
+   * Test DistCp ues diff option under (s)WebHDFSFileSyste.
+   * In this test, we are using DFS as source and WebHDFS as target
+   */
+  @Test
+  public void testSyncSnapshotDiffWithWebHdfs1() throws Exception {
+    Path dfsSource = new Path(dfs.getUri().toString(), source);
+    Path webHdfsTarget = new Path(webfs.getUri().toString(), target);
+
+    snapshotDiffWithPaths(dfsSource, webHdfsTarget);
+  }
+
+  /**
+   * Test DistCp ues diff option under (s)WebHDFSFileSyste.
+   * In this test, we are using WebHDFS as source and DFS as target
+   */
+  @Test
+  public void testSyncSnapshotDiffWithWebHdfs2() throws Exception {
+    Path webHdfsSource = new Path(webfs.getUri().toString(), source);
+    Path dfsTarget = new Path(dfs.getUri().toString(), target);
+
+    snapshotDiffWithPaths(webHdfsSource, dfsTarget);
+  }
+
+  /**
+   * Test DistCp ues diff option under (s)WebHDFSFileSyste.
+   * In this test, we are using WebHDFS for both source and target
+   */
+  @Test
+  public void testSyncSnapshotDiffWithWebHdfs3() throws Exception {
+    Path webHdfsSource = new Path(webfs.getUri().toString(), source);
+    Path webHdfsTarget = new Path(webfs.getUri().toString(), target);
+
+    snapshotDiffWithPaths(webHdfsSource, webHdfsTarget);
+  }
+
+  private void snapshotDiffWithPaths(Path sourceFSPath,
+      Path targetFSPath) throws Exception {
+
+    FileSystem sourceFS = sourceFSPath.getFileSystem(conf);
+    FileSystem targetFS = targetFSPath.getFileSystem(conf);
+
+    // Initialize both source and target file system
+    initData(sourceFS, sourceFSPath);
+    initData(targetFS, targetFSPath);
+
+    // create snapshots on both source and target side with the same name
+    List<Path> paths = Arrays.asList(sourceFSPath, targetFSPath);
+    for (Path path: paths) {
+      FileSystem fs = path.getFileSystem(conf);
+      if (fs instanceof DistributedFileSystem) {
+        ((DistributedFileSystem)fs).allowSnapshot(path);
+      } else if (fs instanceof WebHdfsFileSystem) {
+        ((WebHdfsFileSystem)fs).allowSnapshot(path);
+      } else {
+        throw new IOException("Unsupported fs: " + fs.getScheme());
+      }
+      fs.createSnapshot(path, "s1");
+    }
+
+    // do some modification on source side
+    changeData(sourceFS, sourceFSPath);
+
+    // create a new snapshot on source side
+    sourceFS.createSnapshot(sourceFSPath, "s2");
+
+    //try to copy the difference
+    final DistCpOptions options = new DistCpOptions.Builder(
+        Collections.singletonList(sourceFSPath), targetFSPath)
+        .withUseDiff("s1", "s2")
+        .withSyncFolder(true)
+        .build();
+    options.appendToConf(conf);
+
+    new DistCp(conf, options).execute();
+
+    verifyCopyByFs(sourceFS, targetFS, sourceFS.getFileStatus(sourceFSPath),
+        targetFS.getFileStatus(targetFSPath), false);
+  }
 }