Browse Source

HDFS-14869 Copy renamed files which are not excluded anymore by filter (#1530)

aasha 5 years ago
parent
commit
fccccc9703

+ 1 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java

@@ -84,7 +84,7 @@ public class DistCp extends Configured implements Tool {
     if (context.shouldUseSnapshotDiff()) {
       // When "-diff" or "-rdiff" is passed, do sync() first, then
       // create copyListing based on snapshot diff.
-      DistCpSync distCpSync = new DistCpSync(context, getConf());
+      DistCpSync distCpSync = new DistCpSync(context, job.getConfiguration());
       if (distCpSync.sync()) {
         createInputFileListingWithDiff(job, distCpSync);
       } else {

+ 24 - 7
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java

@@ -57,10 +57,13 @@ class DistCpSync {
   //
   private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
   private DiffInfo[] renameDiffs;
+  private CopyFilter copyFilter;
 
   DistCpSync(DistCpContext context, Configuration conf) {
     this.context = context;
     this.conf = conf;
+    this.copyFilter = CopyFilter.getCopyFilter(conf);
+    this.copyFilter.initialize();
   }
 
   private boolean isRdiff() {
@@ -213,18 +216,32 @@ class DistCpSync {
         }
         SnapshotDiffReport.DiffType dt = entry.getType();
         List<DiffInfo> list = diffMap.get(dt);
+        final Path source =
+                new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
+        final Path relativeSource = new Path(Path.SEPARATOR + source);
         if (dt == SnapshotDiffReport.DiffType.MODIFY ||
             dt == SnapshotDiffReport.DiffType.CREATE ||
             dt == SnapshotDiffReport.DiffType.DELETE) {
-          final Path source =
-              new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
-          list.add(new DiffInfo(source, null, dt));
+          if (copyFilter.shouldCopy(relativeSource)) {
+            list.add(new DiffInfo(source, null, dt));
+          }
         } else if (dt == SnapshotDiffReport.DiffType.RENAME) {
-          final Path source =
-              new Path(DFSUtilClient.bytes2String(entry.getSourcePath()));
           final Path target =
-              new Path(DFSUtilClient.bytes2String(entry.getTargetPath()));
-          list.add(new DiffInfo(source, target, dt));
+                  new Path(DFSUtilClient.bytes2String(entry.getTargetPath()));
+          final Path relativeTarget = new Path(Path.SEPARATOR + target);
+          if (copyFilter.shouldCopy(relativeSource)) {
+            if (copyFilter.shouldCopy(relativeTarget)) {
+              list.add(new DiffInfo(source, target, dt));
+            } else {
+              list = diffMap.get(SnapshotDiffReport.DiffType.DELETE);
+              list.add(new DiffInfo(source, target,
+                      SnapshotDiffReport.DiffType.DELETE));
+            }
+          } else if (copyFilter.shouldCopy(relativeTarget)) {
+            list = diffMap.get(SnapshotDiffReport.DiffType.CREATE);
+            list.add(new DiffInfo(target, null,
+                    SnapshotDiffReport.DiffType.CREATE));
+          }
         }
       }
       return true;

+ 160 - 0
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java

@@ -39,6 +39,13 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+
+import java.io.IOException;
+import java.io.FileWriter;
+import java.io.BufferedWriter;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -747,4 +754,157 @@ public class TestDistCpSync {
     }
     Assert.assertTrue(threwException);
   }
+
+  private void initData10(Path dir) throws Exception {
+    final Path staging = new Path(dir, ".staging");
+    final Path stagingF1 = new Path(staging, "f1");
+    final Path data = new Path(dir, "data");
+    final Path dataF1 = new Path(data, "f1");
+
+    DFSTestUtil.createFile(dfs, stagingF1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, dataF1, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private void changeData10(Path dir) throws Exception {
+    final Path staging = new Path(dir, ".staging");
+    final Path prod = new Path(dir, "prod");
+    dfs.rename(staging, prod);
+  }
+
+  private java.nio.file.Path generateFilterFile(String fileName)
+          throws IOException {
+    java.nio.file.Path tmpFile = Files.createTempFile(fileName, "txt");
+    String str = ".*\\.staging.*";
+    try (BufferedWriter writer = new BufferedWriter(
+            new FileWriter(tmpFile.toString()))) {
+      writer.write(str);
+    }
+    return tmpFile;
+  }
+
+  private void deleteFilterFile(java.nio.file.Path filePath)
+          throws IOException {
+    Files.delete(filePath);
+  }
+
+  @Test
+  public void testSync10() throws Exception {
+    java.nio.file.Path filterFile = null;
+    try {
+      Path sourcePath = new Path(dfs.getWorkingDirectory(), "source");
+      initData10(sourcePath);
+      dfs.allowSnapshot(sourcePath);
+      dfs.createSnapshot(sourcePath, "s1");
+      filterFile = generateFilterFile("filters");
+      final DistCpOptions.Builder builder = new DistCpOptions.Builder(
+              new ArrayList<>(Arrays.asList(sourcePath)),
+              target)
+              .withFiltersFile(filterFile.toString())
+              .withSyncFolder(true);
+      new DistCp(conf, builder.build()).execute();
+      verifySync(dfs.getFileStatus(sourcePath),
+              dfs.getFileStatus(target), false, ".staging");
+
+      dfs.allowSnapshot(target);
+      dfs.createSnapshot(target, "s1");
+      changeData10(sourcePath);
+      dfs.createSnapshot(sourcePath, "s2");
+
+      final DistCpOptions.Builder diffBuilder = new DistCpOptions.Builder(
+              new ArrayList<>(Arrays.asList(sourcePath)),
+              target)
+              .withUseDiff("s1", "s2")
+              .withFiltersFile(filterFile.toString())
+              .withSyncFolder(true);
+      new DistCp(conf, diffBuilder.build()).execute();
+      verifyCopy(dfs.getFileStatus(sourcePath),
+              dfs.getFileStatus(target), false);
+    } finally {
+      deleteFilterFile(filterFile);
+    }
+  }
+
+  private void initData11(Path dir) throws Exception {
+    final Path staging = new Path(dir, "prod");
+    final Path stagingF1 = new Path(staging, "f1");
+    final Path data = new Path(dir, "data");
+    final Path dataF1 = new Path(data, "f1");
+
+    DFSTestUtil.createFile(dfs, stagingF1, BLOCK_SIZE, DATA_NUM, 0L);
+    DFSTestUtil.createFile(dfs, dataF1, BLOCK_SIZE, DATA_NUM, 0L);
+  }
+
+  private void changeData11(Path dir) throws Exception {
+    final Path staging = new Path(dir, "prod");
+    final Path prod = new Path(dir, ".staging");
+    dfs.rename(staging, prod);
+  }
+
+  private void verifySync(FileStatus s, FileStatus t, boolean compareName,
+                          String deletedName)
+          throws Exception {
+    Assert.assertEquals(s.isDirectory(), t.isDirectory());
+    if (compareName) {
+      Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
+    }
+    if (!s.isDirectory()) {
+      // verify the file content is the same
+      byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
+      byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
+      Assert.assertArrayEquals(sbytes, tbytes);
+    } else {
+      FileStatus[] slist = dfs.listStatus(s.getPath());
+      FileStatus[] tlist = dfs.listStatus(t.getPath());
+      int minFiles = tlist.length;
+      if (slist.length < tlist.length) {
+        minFiles = slist.length;
+      }
+      for (int i = 0; i < minFiles; i++) {
+        if (slist[i].getPath().getName().contains(deletedName)) {
+          if (tlist[i].getPath().getName().contains(deletedName)) {
+            throw new Exception("Target is not synced as per exclusion filter");
+          }
+          continue;
+        }
+        verifySync(slist[i], tlist[i], true, deletedName);
+      }
+    }
+  }
+
+  @Test
+  public void testSync11() throws Exception {
+    java.nio.file.Path filterFile = null;
+    try {
+      Path sourcePath = new Path(dfs.getWorkingDirectory(), "source");
+      initData11(sourcePath);
+      dfs.allowSnapshot(sourcePath);
+      dfs.createSnapshot(sourcePath, "s1");
+      filterFile = generateFilterFile("filters");
+      final DistCpOptions.Builder builder = new DistCpOptions.Builder(
+              new ArrayList<>(Arrays.asList(sourcePath)),
+              target)
+              .withFiltersFile(filterFile.toString())
+              .withSyncFolder(true);
+      new DistCp(conf, builder.build()).execute();
+      verifyCopy(dfs.getFileStatus(sourcePath),
+              dfs.getFileStatus(target), false);
+
+      dfs.allowSnapshot(target);
+      dfs.createSnapshot(target, "s1");
+      changeData11(sourcePath);
+      dfs.createSnapshot(sourcePath, "s2");
+
+      final DistCpOptions.Builder diffBuilder = new DistCpOptions.Builder(
+              new ArrayList<>(Arrays.asList(sourcePath)),
+              target)
+              .withUseDiff("s1", "s2")
+              .withFiltersFile(filterFile.toString())
+              .withSyncFolder(true);
+      new DistCp(conf, diffBuilder.build()).execute();
+      verifySync(dfs.getFileStatus(sourcePath),
+              dfs.getFileStatus(target), false, ".staging");
+    } finally {
+      deleteFilterFile(filterFile);
+    }
+  }
 }