|
@@ -0,0 +1,868 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.tools;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FsShell;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.mapreduce.Mapper;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
+import org.apache.hadoop.tools.mapred.CopyMapper;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.PrintStream;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.StringTokenizer;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Base class to test "-rdiff s2 s1".
|
|
|
+ * Shared by "-rdiff s2 s1 src tgt" and "-rdiff s2 s1 tgt tgt"
|
|
|
+ */
|
|
|
+public abstract class TestDistCpSyncReverseBase {
|
|
|
+ private MiniDFSCluster cluster;
|
|
|
+ private final Configuration conf = new HdfsConfiguration();
|
|
|
+ private DistributedFileSystem dfs;
|
|
|
+ private DistCpOptions options;
|
|
|
+ private Path source;
|
|
|
+ private boolean isSrcNotSameAsTgt = true;
|
|
|
+ private final Path target = new Path("/target");
|
|
|
+ private final long blockSize = 1024;
|
|
|
+ private final short dataNum = 1;
|
|
|
+
|
|
|
+ abstract void initSourcePath();
|
|
|
+
|
|
|
+ private static List<String> lsr(final String prefix,
|
|
|
+ final FsShell shell, Path rootDir) throws Exception {
|
|
|
+ return lsr(prefix, shell, rootDir.toString(), null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<String> lsrSource(final String prefix,
|
|
|
+ final FsShell shell, Path rootDir) throws Exception {
|
|
|
+ final Path spath = isSrcNotSameAsTgt? rootDir :
|
|
|
+ new Path(rootDir.toString(),
|
|
|
+ HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
|
|
+ return lsr(prefix, shell, spath.toString(), null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static List<String> lsr(final String prefix,
|
|
|
+ final FsShell shell, String rootDir, String glob) throws Exception {
|
|
|
+ final String dir = glob == null ? rootDir : glob;
|
|
|
+ System.out.println(prefix + " lsr root=" + rootDir);
|
|
|
+ final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
|
|
+ final PrintStream out = new PrintStream(bytes);
|
|
|
+ final PrintStream oldOut = System.out;
|
|
|
+ final PrintStream oldErr = System.err;
|
|
|
+ System.setOut(out);
|
|
|
+ System.setErr(out);
|
|
|
+ final String results;
|
|
|
+ try {
|
|
|
+ Assert.assertEquals(0, shell.run(new String[] {"-lsr", dir }));
|
|
|
+ results = bytes.toString();
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ System.setOut(oldOut);
|
|
|
+ System.setErr(oldErr);
|
|
|
+ }
|
|
|
+ System.out.println("lsr results:\n" + results);
|
|
|
+ String dirname = rootDir;
|
|
|
+ if (rootDir.lastIndexOf(Path.SEPARATOR) != -1) {
|
|
|
+ dirname = rootDir.substring(rootDir.lastIndexOf(Path.SEPARATOR));
|
|
|
+ }
|
|
|
+
|
|
|
+ final List<String> paths = new ArrayList<String>();
|
|
|
+ for (StringTokenizer t = new StringTokenizer(results, "\n"); t
|
|
|
+ .hasMoreTokens();) {
|
|
|
+ final String s = t.nextToken();
|
|
|
+ final int i = s.indexOf(dirname);
|
|
|
+ if (i >= 0) {
|
|
|
+ paths.add(s.substring(i + dirname.length()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Collections.sort(paths);
|
|
|
+ System.out
|
|
|
+ .println("lsr paths = " + paths.toString().replace(", ", ",\n "));
|
|
|
+ return paths;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setSource(final Path src) {
|
|
|
+ this.source = src;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setSrcNotSameAsTgt(final boolean srcNotSameAsTgt) {
|
|
|
+ isSrcNotSameAsTgt = srcNotSameAsTgt;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ initSourcePath();
|
|
|
+
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dataNum).build();
|
|
|
+ cluster.waitActive();
|
|
|
+
|
|
|
+ dfs = cluster.getFileSystem();
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ dfs.mkdirs(source);
|
|
|
+ }
|
|
|
+ dfs.mkdirs(target);
|
|
|
+
|
|
|
+ options = new DistCpOptions(Arrays.asList(source), target);
|
|
|
+ options.setSyncFolder(true);
|
|
|
+ options.setUseRdiff("s2", "s1");
|
|
|
+ options.appendToConf(conf);
|
|
|
+
|
|
|
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
|
|
|
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void tearDown() throws Exception {
|
|
|
+ IOUtils.cleanup(null, dfs);
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the sync returns false in the following scenarios:
|
|
|
+ * 1. the source/target dir are not snapshottable dir
|
|
|
+ * 2. the source/target does not have the given snapshots
|
|
|
+ * 3. changes have been made in target
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFallback() throws Exception {
|
|
|
+ // the source/target dir are not snapshottable dir
|
|
|
+ Assert.assertFalse(sync());
|
|
|
+ // make sure the source path has been updated to the snapshot path
|
|
|
+ final Path spath = new Path(source,
|
|
|
+ HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
|
|
+ Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
|
|
+
|
|
|
+ // reset source path in options
|
|
|
+ options.setSourcePaths(Arrays.asList(source));
|
|
|
+ // the source/target does not have the given snapshots
|
|
|
+ dfs.allowSnapshot(source);
|
|
|
+ dfs.allowSnapshot(target);
|
|
|
+ Assert.assertFalse(sync());
|
|
|
+ Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
|
|
+
|
|
|
+ // reset source path in options
|
|
|
+ options.setSourcePaths(Arrays.asList(source));
|
|
|
+ this.enableAndCreateFirstSnapshot();
|
|
|
+ dfs.createSnapshot(target, "s2");
|
|
|
+ Assert.assertTrue(sync());
|
|
|
+
|
|
|
+ // reset source paths in options
|
|
|
+ options.setSourcePaths(Arrays.asList(source));
|
|
|
+ // changes have been made in target
|
|
|
+ final Path subTarget = new Path(target, "sub");
|
|
|
+ dfs.mkdirs(subTarget);
|
|
|
+ Assert.assertFalse(sync());
|
|
|
+ // make sure the source path has been updated to the snapshot path
|
|
|
+ Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
|
|
+
|
|
|
+ // reset source paths in options
|
|
|
+ options.setSourcePaths(Arrays.asList(source));
|
|
|
+ dfs.delete(subTarget, true);
|
|
|
+ Assert.assertTrue(sync());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void syncAndVerify() throws Exception {
|
|
|
+
|
|
|
+ final FsShell shell = new FsShell(conf);
|
|
|
+ lsrSource("Before sync source: ", shell, source);
|
|
|
+ lsr("Before sync target: ", shell, target);
|
|
|
+
|
|
|
+ Assert.assertTrue(sync());
|
|
|
+
|
|
|
+ lsrSource("After sync source: ", shell, source);
|
|
|
+ lsr("After sync target: ", shell, target);
|
|
|
+
|
|
|
+ verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean sync() throws Exception {
|
|
|
+ DistCpSync distCpSync = new DistCpSync(options, conf);
|
|
|
+ return distCpSync.sync();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void enableAndCreateFirstSnapshot() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ dfs.allowSnapshot(source);
|
|
|
+ dfs.createSnapshot(source, "s1");
|
|
|
+ }
|
|
|
+ dfs.allowSnapshot(target);
|
|
|
+ dfs.createSnapshot(target, "s1");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createSecondSnapshotAtTarget() throws Exception {
|
|
|
+ dfs.createSnapshot(target, "s2");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createMiddleSnapshotAtTarget() throws Exception {
|
|
|
+ dfs.createSnapshot(target, "s1.5");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * create some files and directories under the given directory.
|
|
|
+ * the final subtree looks like this:
|
|
|
+ * dir/
|
|
|
+ * foo/ bar/
|
|
|
+ * d1/ f1 d2/ f2
|
|
|
+ * f3 f4
|
|
|
+ */
|
|
|
+ private void initData(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path d1 = new Path(foo, "d1");
|
|
|
+ final Path f1 = new Path(foo, "f1");
|
|
|
+ final Path d2 = new Path(bar, "d2");
|
|
|
+ final Path f2 = new Path(bar, "f2");
|
|
|
+ final Path f3 = new Path(d1, "f3");
|
|
|
+ final Path f4 = new Path(d2, "f4");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0);
|
|
|
+ DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0);
|
|
|
+ DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 0);
|
|
|
+ DFSTestUtil.createFile(dfs, f4, blockSize, dataNum, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * make some changes under the given directory (created in the above way).
|
|
|
+ * 1. rename dir/foo/d1 to dir/bar/d1
|
|
|
+ * 2. delete dir/bar/d1/f3
|
|
|
+ * 3. rename dir/foo to /dir/bar/d1/foo
|
|
|
+ * 4. delete dir/bar/d1/foo/f1
|
|
|
+ * 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE
|
|
|
+ * 6. append one BLOCK to file dir/bar/f2
|
|
|
+ * 7. rename dir/bar to dir/foo
|
|
|
+ *
|
|
|
+ * Thus after all these ops the subtree looks like this:
|
|
|
+ * dir/
|
|
|
+ * foo/
|
|
|
+ * d1/ f2(A) d2/
|
|
|
+ * foo/ f4
|
|
|
+ * f1(new)
|
|
|
+ */
|
|
|
+ private int changeData(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path d1 = new Path(foo, "d1");
|
|
|
+ final Path f2 = new Path(bar, "f2");
|
|
|
+
|
|
|
+ final Path bar_d1 = new Path(bar, "d1");
|
|
|
+ int numDeletedModified = 0;
|
|
|
+ dfs.rename(d1, bar_d1);
|
|
|
+ numDeletedModified += 1; // modify ./foo
|
|
|
+ numDeletedModified += 1; // modify ./bar
|
|
|
+ final Path f3 = new Path(bar_d1, "f3");
|
|
|
+ dfs.delete(f3, true);
|
|
|
+ numDeletedModified += 1; // delete f3
|
|
|
+ final Path newfoo = new Path(bar_d1, "foo");
|
|
|
+ dfs.rename(foo, newfoo);
|
|
|
+ numDeletedModified += 1; // modify ./foo/d1
|
|
|
+ final Path f1 = new Path(newfoo, "f1");
|
|
|
+ dfs.delete(f1, true);
|
|
|
+ numDeletedModified += 1; // delete ./foo/f1
|
|
|
+ DFSTestUtil.createFile(dfs, f1, 2 * blockSize, dataNum, 0);
|
|
|
+ DFSTestUtil.appendFile(dfs, f2, (int) blockSize);
|
|
|
+ numDeletedModified += 1; // modify ./bar/f2
|
|
|
+ dfs.rename(bar, new Path(dir, "foo"));
|
|
|
+ return numDeletedModified;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the basic functionality.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData(source);
|
|
|
+ }
|
|
|
+ initData(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+
|
|
|
+ final FsShell shell = new FsShell(conf);
|
|
|
+
|
|
|
+ lsrSource("Before source: ", shell, source);
|
|
|
+ lsr("Before target: ", shell, target);
|
|
|
+
|
|
|
+ // make changes under target
|
|
|
+ int numDeletedModified = changeData(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
|
|
+ System.out.println(report);
|
|
|
+
|
|
|
+ DistCpSync distCpSync = new DistCpSync(options, conf);
|
|
|
+
|
|
|
+ lsr("Before sync target: ", shell, target);
|
|
|
+
|
|
|
+ // do the sync
|
|
|
+ Assert.assertTrue(distCpSync.sync());
|
|
|
+
|
|
|
+ lsr("After sync target: ", shell, target);
|
|
|
+
|
|
|
+ // make sure the source path has been updated to the snapshot path
|
|
|
+ final Path spath = new Path(source,
|
|
|
+ HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
|
|
+ Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
|
|
+
|
|
|
+ // build copy listing
|
|
|
+ final Path listingPath = new Path("/tmp/META/fileList.seq");
|
|
|
+ CopyListing listing = new SimpleCopyListing(conf, new Credentials(),
|
|
|
+ distCpSync);
|
|
|
+ listing.buildListing(listingPath, options);
|
|
|
+
|
|
|
+ Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
|
|
+ CopyMapper copyMapper = new CopyMapper();
|
|
|
+ StubContext stubContext = new StubContext(conf, null, 0);
|
|
|
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
|
|
+ stubContext.getContext();
|
|
|
+ // Enable append
|
|
|
+ context.getConfiguration().setBoolean(
|
|
|
+ DistCpOptionSwitch.APPEND.getConfigLabel(), true);
|
|
|
+ copyMapper.setup(context);
|
|
|
+ for (Map.Entry<Text, CopyListingFileStatus> entry :
|
|
|
+ copyListing.entrySet()) {
|
|
|
+ copyMapper.map(entry.getKey(), entry.getValue(), context);
|
|
|
+ }
|
|
|
+
|
|
|
+ lsrSource("After mapper source: ", shell, source);
|
|
|
+ lsr("After mapper target: ", shell, target);
|
|
|
+
|
|
|
+ // verify that we only list modified and created files/directories
|
|
|
+ Assert.assertEquals(numDeletedModified, copyListing.size());
|
|
|
+
|
|
|
+ // verify that we only copied new appended data of f2 and the new file f1
|
|
|
+ Assert.assertEquals(blockSize * 3, stubContext.getReporter()
|
|
|
+ .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
|
|
+
|
|
|
+ // verify the source and target now has the same structure
|
|
|
+ verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
|
|
|
+ throws Exception {
|
|
|
+ SequenceFile.Reader reader = null;
|
|
|
+ Map<Text, CopyListingFileStatus> values = new HashMap<>();
|
|
|
+ try {
|
|
|
+ reader = new SequenceFile.Reader(conf,
|
|
|
+ SequenceFile.Reader.file(listingPath));
|
|
|
+ Text key = new Text();
|
|
|
+ CopyListingFileStatus value = new CopyListingFileStatus();
|
|
|
+ while (reader.next(key, value)) {
|
|
|
+ values.put(key, value);
|
|
|
+ key = new Text();
|
|
|
+ value = new CopyListingFileStatus();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (reader != null) {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return values;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
|
|
|
+ throws Exception {
|
|
|
+ Assert.assertEquals(s.isDirectory(), t.isDirectory());
|
|
|
+ if (compareName) {
|
|
|
+ Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
|
|
|
+ }
|
|
|
+ if (!s.isDirectory()) {
|
|
|
+ // verify the file content is the same
|
|
|
+ byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
|
|
|
+ byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
|
|
|
+ Assert.assertArrayEquals(sbytes, tbytes);
|
|
|
+ } else {
|
|
|
+ FileStatus[] slist = dfs.listStatus(s.getPath());
|
|
|
+ FileStatus[] tlist = dfs.listStatus(t.getPath());
|
|
|
+ Assert.assertEquals(slist.length, tlist.length);
|
|
|
+ for (int i = 0; i < slist.length; i++) {
|
|
|
+ verifyCopy(slist[i], tlist[i], true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the case that "current" is snapshotted as "s2".
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSyncWithCurrent() throws Exception {
|
|
|
+ options.setUseRdiff(".", "s1");
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData(source);
|
|
|
+ }
|
|
|
+ initData(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+
|
|
|
+ // make changes under target
|
|
|
+ changeData(target);
|
|
|
+
|
|
|
+ // do the sync
|
|
|
+ Assert.assertTrue(sync());
|
|
|
+ final Path spath = new Path(source,
|
|
|
+ HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
|
|
+ // make sure the source path is still unchanged
|
|
|
+ Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData2(Path dir) throws Exception {
|
|
|
+ final Path test = new Path(dir, "test");
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path f1 = new Path(test, "f1");
|
|
|
+ final Path f2 = new Path(foo, "f2");
|
|
|
+ final Path f3 = new Path(bar, "f3");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 1L);
|
|
|
+ DFSTestUtil.createFile(dfs, f3, blockSize, dataNum, 2L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void changeData2(Path dir) throws Exception {
|
|
|
+ final Path tmpFoo = new Path(dir, "tmpFoo");
|
|
|
+ final Path test = new Path(dir, "test");
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+
|
|
|
+ dfs.rename(test, tmpFoo);
|
|
|
+ dfs.rename(foo, test);
|
|
|
+ dfs.rename(bar, foo);
|
|
|
+ dfs.rename(tmpFoo, bar);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSync2() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData2(source);
|
|
|
+ }
|
|
|
+ initData2(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+
|
|
|
+ // make changes under target
|
|
|
+ changeData2(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
|
|
+ System.out.println(report);
|
|
|
+
|
|
|
+ syncAndVerify();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData3(Path dir) throws Exception {
|
|
|
+ final Path test = new Path(dir, "test");
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path f1 = new Path(test, "file");
|
|
|
+ final Path f2 = new Path(foo, "file");
|
|
|
+ final Path f3 = new Path(bar, "file");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, f2, blockSize * 2, dataNum, 1L);
|
|
|
+ DFSTestUtil.createFile(dfs, f3, blockSize * 3, dataNum, 2L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void changeData3(Path dir) throws Exception {
|
|
|
+ final Path test = new Path(dir, "test");
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path f1 = new Path(test, "file");
|
|
|
+ final Path f2 = new Path(foo, "file");
|
|
|
+ final Path f3 = new Path(bar, "file");
|
|
|
+ final Path newf1 = new Path(test, "newfile");
|
|
|
+ final Path newf2 = new Path(foo, "newfile");
|
|
|
+ final Path newf3 = new Path(bar, "newfile");
|
|
|
+
|
|
|
+ dfs.rename(f1, newf1);
|
|
|
+ dfs.rename(f2, newf2);
|
|
|
+ dfs.rename(f3, newf3);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case where there are multiple source files with the same name.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync3() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData3(source);
|
|
|
+ }
|
|
|
+ initData3(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+
|
|
|
+ // make changes under target
|
|
|
+ changeData3(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
|
|
+ System.out.println(report);
|
|
|
+
|
|
|
+ syncAndVerify();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData4(Path dir) throws Exception {
|
|
|
+ final Path d1 = new Path(dir, "d1");
|
|
|
+ final Path d2 = new Path(d1, "d2");
|
|
|
+ final Path f1 = new Path(d2, "f1");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int changeData4(Path dir) throws Exception {
|
|
|
+ final Path d1 = new Path(dir, "d1");
|
|
|
+ final Path d11 = new Path(dir, "d11");
|
|
|
+ final Path d2 = new Path(d1, "d2");
|
|
|
+ final Path d21 = new Path(d1, "d21");
|
|
|
+ final Path f1 = new Path(d2, "f1");
|
|
|
+
|
|
|
+ int numDeletedAndModified = 0;
|
|
|
+ dfs.delete(f1, false);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ dfs.rename(d2, d21);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ dfs.rename(d1, d11);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ return numDeletedAndModified;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case where multiple level dirs are renamed.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync4() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData4(source);
|
|
|
+ }
|
|
|
+ initData4(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+
|
|
|
+ final FsShell shell = new FsShell(conf);
|
|
|
+ lsr("Before change target: ", shell, target);
|
|
|
+
|
|
|
+ // make changes under target
|
|
|
+ int numDeletedAndModified = changeData4(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
|
|
+ System.out.println(report);
|
|
|
+
|
|
|
+ testAndVerify(numDeletedAndModified);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData5(Path dir) throws Exception {
|
|
|
+ final Path d1 = new Path(dir, "d1");
|
|
|
+ final Path d2 = new Path(dir, "d2");
|
|
|
+ final Path f1 = new Path(d1, "f1");
|
|
|
+ final Path f2 = new Path(d2, "f2");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, f2, blockSize, dataNum, 0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int changeData5(Path dir) throws Exception {
|
|
|
+ final Path d1 = new Path(dir, "d1");
|
|
|
+ final Path d2 = new Path(dir, "d2");
|
|
|
+ final Path f1 = new Path(d1, "f1");
|
|
|
+ final Path tmp = new Path(dir, "tmp");
|
|
|
+
|
|
|
+ int numDeletedAndModified = 0;
|
|
|
+ dfs.delete(f1, false);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ dfs.rename(d1, tmp);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ dfs.rename(d2, d1);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ final Path f2 = new Path(d1, "f2");
|
|
|
+ dfs.delete(f2, false);
|
|
|
+ numDeletedAndModified += 1;
|
|
|
+ return numDeletedAndModified;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case with different delete and rename sequences.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync5() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData5(source);
|
|
|
+ }
|
|
|
+ initData5(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+
|
|
|
+ // make changes under target
|
|
|
+ int numDeletedAndModified = changeData5(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
|
|
+ System.out.println(report);
|
|
|
+
|
|
|
+ testAndVerify(numDeletedAndModified);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testAndVerify(int numDeletedAndModified)
|
|
|
+ throws Exception{
|
|
|
+ SnapshotDiffReport report = dfs.getSnapshotDiffReport(target, "s2", "s1");
|
|
|
+ System.out.println(report);
|
|
|
+
|
|
|
+ final FsShell shell = new FsShell(conf);
|
|
|
+
|
|
|
+ lsrSource("Before sync source: ", shell, source);
|
|
|
+ lsr("Before sync target: ", shell, target);
|
|
|
+
|
|
|
+ DistCpSync distCpSync = new DistCpSync(options, conf);
|
|
|
+ // do the sync
|
|
|
+ distCpSync.sync();
|
|
|
+
|
|
|
+ lsr("After sync target: ", shell, target);
|
|
|
+
|
|
|
+ // make sure the source path has been updated to the snapshot path
|
|
|
+ final Path spath = new Path(source,
|
|
|
+ HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s1");
|
|
|
+ Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
|
|
+
|
|
|
+ // build copy listing
|
|
|
+ final Path listingPath = new Path("/tmp/META/fileList.seq");
|
|
|
+ CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
|
|
+ listing.buildListing(listingPath, options);
|
|
|
+
|
|
|
+ Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
|
|
+ CopyMapper copyMapper = new CopyMapper();
|
|
|
+ StubContext stubContext = new StubContext(conf, null, 0);
|
|
|
+ Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
|
|
+ stubContext.getContext();
|
|
|
+ // Enable append
|
|
|
+ context.getConfiguration().setBoolean(
|
|
|
+ DistCpOptionSwitch.APPEND.getConfigLabel(), true);
|
|
|
+ copyMapper.setup(context);
|
|
|
+ for (Map.Entry<Text, CopyListingFileStatus> entry :
|
|
|
+ copyListing.entrySet()) {
|
|
|
+ copyMapper.map(entry.getKey(), entry.getValue(), context);
|
|
|
+ }
|
|
|
+
|
|
|
+ // verify that we only list modified and created files/directories
|
|
|
+ Assert.assertEquals(numDeletedAndModified, copyListing.size());
|
|
|
+
|
|
|
+ lsr("After Copy target: ", shell, target);
|
|
|
+
|
|
|
+ // verify the source and target now has the same structure
|
|
|
+ verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData6(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path foo_f1 = new Path(foo, "f1");
|
|
|
+ final Path bar_f1 = new Path(bar, "f1");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int changeData6(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path foo2 = new Path(dir, "foo2");
|
|
|
+ final Path foo_f1 = new Path(foo, "f1");
|
|
|
+
|
|
|
+ int numDeletedModified = 0;
|
|
|
+ dfs.rename(foo, foo2);
|
|
|
+ dfs.rename(bar, foo);
|
|
|
+ dfs.rename(foo2, bar);
|
|
|
+ DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize);
|
|
|
+ numDeletedModified += 1; // modify ./bar/f1
|
|
|
+ return numDeletedModified;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case where there is a cycle in renaming dirs.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync6() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData6(source);
|
|
|
+ }
|
|
|
+ initData6(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+ int numDeletedModified = changeData6(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ testAndVerify(numDeletedModified);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData7(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path foo_f1 = new Path(foo, "f1");
|
|
|
+ final Path bar_f1 = new Path(bar, "f1");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int changeData7(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path foo2 = new Path(dir, "foo2");
|
|
|
+ final Path foo_f1 = new Path(foo, "f1");
|
|
|
+ final Path foo2_f2 = new Path(foo2, "f2");
|
|
|
+ final Path foo_d1 = new Path(foo, "d1");
|
|
|
+ final Path foo_d1_f3 = new Path(foo_d1, "f3");
|
|
|
+
|
|
|
+ int numDeletedAndModified = 0;
|
|
|
+ dfs.rename(foo, foo2);
|
|
|
+ DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.appendFile(dfs, foo_f1, (int) blockSize);
|
|
|
+ dfs.rename(foo_f1, foo2_f2);
|
|
|
+ /*
|
|
|
+ * Difference between snapshot s1 and current directory under directory
|
|
|
+ /target:
|
|
|
+M .
|
|
|
++ ./foo
|
|
|
+R ./foo -> ./foo2
|
|
|
+M ./foo
|
|
|
++ ./foo/f2
|
|
|
+ */
|
|
|
+ numDeletedAndModified += 1; // "M ./foo"
|
|
|
+ DFSTestUtil.createFile(dfs, foo_d1_f3, blockSize, dataNum, 0L);
|
|
|
+ return numDeletedAndModified;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case where rename a dir, then create a new dir with the same name
|
|
|
+ * and sub dir.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync7() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData7(source);
|
|
|
+ }
|
|
|
+ initData7(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+ int numDeletedAndModified = changeData7(target);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ testAndVerify(numDeletedAndModified);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initData8(Path dir) throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path d1 = new Path(dir, "d1");
|
|
|
+ final Path foo_f1 = new Path(foo, "f1");
|
|
|
+ final Path bar_f1 = new Path(bar, "f1");
|
|
|
+ final Path d1_f1 = new Path(d1, "f1");
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(dfs, foo_f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, bar_f1, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, d1_f1, blockSize, dataNum, 0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int changeData8(Path dir, boolean createMiddleSnapshot)
|
|
|
+ throws Exception {
|
|
|
+ final Path foo = new Path(dir, "foo");
|
|
|
+ final Path createdDir = new Path(dir, "c");
|
|
|
+ final Path d1 = new Path(dir, "d1");
|
|
|
+ final Path d1_f1 = new Path(d1, "f1");
|
|
|
+ final Path createdDir_f1 = new Path(createdDir, "f1");
|
|
|
+ final Path foo_f3 = new Path(foo, "f3");
|
|
|
+ final Path new_foo = new Path(createdDir, "foo");
|
|
|
+ final Path foo_f4 = new Path(foo, "f4");
|
|
|
+ final Path foo_d1 = new Path(foo, "d1");
|
|
|
+ final Path bar = new Path(dir, "bar");
|
|
|
+ final Path bar1 = new Path(dir, "bar1");
|
|
|
+
|
|
|
+ int numDeletedAndModified = 0;
|
|
|
+ DFSTestUtil.createFile(dfs, foo_f3, blockSize, dataNum, 0L);
|
|
|
+ DFSTestUtil.createFile(dfs, createdDir_f1, blockSize, dataNum, 0L);
|
|
|
+ dfs.rename(createdDir_f1, foo_f4);
|
|
|
+ dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1
|
|
|
+ numDeletedAndModified += 1; // modify ./c/foo/d1
|
|
|
+
|
|
|
+ if (createMiddleSnapshot) {
|
|
|
+ this.createMiddleSnapshotAtTarget();
|
|
|
+ }
|
|
|
+
|
|
|
+ dfs.rename(d1, foo_d1);
|
|
|
+ numDeletedAndModified += 1; // modify ./c/foo
|
|
|
+ dfs.rename(foo, new_foo);
|
|
|
+ dfs.rename(bar, bar1);
|
|
|
+ return numDeletedAndModified;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case where create a dir, then mv a existed dir into it.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync8() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData8(source);
|
|
|
+ }
|
|
|
+ initData8(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+ int numDeletedModified = changeData8(target, false);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ testAndVerify(numDeletedModified);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test a case where create a dir, then mv a existed dir into it.
|
|
|
+ * The difference between this one and testSync8 is, this one
|
|
|
+ * also creates a snapshot s1.5 in between s1 and s2.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSync9() throws Exception {
|
|
|
+ if (isSrcNotSameAsTgt) {
|
|
|
+ initData8(source);
|
|
|
+ }
|
|
|
+ initData8(target);
|
|
|
+ enableAndCreateFirstSnapshot();
|
|
|
+ int numDeletedModified = changeData8(target, true);
|
|
|
+
|
|
|
+ createSecondSnapshotAtTarget();
|
|
|
+
|
|
|
+ testAndVerify(numDeletedModified);
|
|
|
+ }
|
|
|
+}
|