|
@@ -29,6 +29,7 @@ import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -72,6 +73,9 @@ public abstract class AbstractContractDistCpTest
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(AbstractContractDistCpTest.class);
|
|
|
|
|
|
+ /** Using offset to change modification time in tests. */
|
|
|
+ private static final long MODIFICATION_TIME_OFFSET = 10000;
|
|
|
+
|
|
|
public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
|
|
|
= "scale.test.distcp.file.size.kb";
|
|
|
|
|
@@ -354,6 +358,29 @@ public abstract class AbstractContractDistCpTest
|
|
|
.withOverwrite(false)));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Run distcp -update srcDir destDir.
|
|
|
+ * @param srcDir local source directory
|
|
|
+ * @param destDir remote destination directory.
|
|
|
+ * @return the completed job
|
|
|
+ * @throws Exception any failure.
|
|
|
+ */
|
|
|
+ private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
|
|
|
+ FileSystem sourceFs, FileSystem targetFs)
|
|
|
+ throws Exception {
|
|
|
+ describe("\nDistcp -update from " + srcDir + " to " + destDir);
|
|
|
+ lsR("Source Fs to update", sourceFs, srcDir);
|
|
|
+ lsR("Target Fs before update", targetFs, destDir);
|
|
|
+ return runDistCp(buildWithStandardOptions(
|
|
|
+ new DistCpOptions.Builder(
|
|
|
+ Collections.singletonList(srcDir), destDir)
|
|
|
+ .withDeleteMissing(true)
|
|
|
+ .withSyncFolder(true)
|
|
|
+ .withSkipCRC(false)
|
|
|
+ .withDirectWrite(shouldUseDirectWrite())
|
|
|
+ .withOverwrite(false)));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Update the source directories as various tests expect,
|
|
|
* including adding a new file.
|
|
@@ -857,4 +884,122 @@ public abstract class AbstractContractDistCpTest
|
|
|
verifyFileContents(localFS, dest, block);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testDistCpUpdateCheckFileSkip() throws Exception {
|
|
|
+ describe("Distcp update to check file skips.");
|
|
|
+
|
|
|
+ Path source = new Path(remoteDir, "file");
|
|
|
+ Path dest = new Path(localDir, "file");
|
|
|
+
|
|
|
+ Path source0byte = new Path(remoteDir, "file_0byte");
|
|
|
+ Path dest0byte = new Path(localDir, "file_0byte");
|
|
|
+ dest = localFS.makeQualified(dest);
|
|
|
+ dest0byte = localFS.makeQualified(dest0byte);
|
|
|
+
|
|
|
+ // Creating a source file with certain dataset.
|
|
|
+ byte[] sourceBlock = dataset(10, 'a', 'z');
|
|
|
+
|
|
|
+ // Write the dataset and as well create the target path.
|
|
|
+ ContractTestUtils.createFile(localFS, dest, true, sourceBlock);
|
|
|
+ ContractTestUtils
|
|
|
+ .writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
|
|
|
+ 1024, true);
|
|
|
+
|
|
|
+ // Create 0 byte source and target files.
|
|
|
+ ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]);
|
|
|
+ ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]);
|
|
|
+
|
|
|
+ // Execute the distcp -update job.
|
|
|
+ Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);
|
|
|
+
|
|
|
+ // First distcp -update would normally copy the source to dest.
|
|
|
+ verifyFileContents(localFS, dest, sourceBlock);
|
|
|
+ // Verify 1 file was skipped in the distcp -update (The 0 byte file).
|
|
|
+ // Verify 1 file was copied in the distcp -update (The new source file).
|
|
|
+ verifySkipAndCopyCounter(job, 1, 1);
|
|
|
+
|
|
|
+ // Remove the source file and replace with a file with same name and size
|
|
|
+ // but different content.
|
|
|
+ remoteFS.delete(source, false);
|
|
|
+ Path updatedSource = new Path(remoteDir, "file");
|
|
|
+ byte[] updatedSourceBlock = dataset(10, 'b', 'z');
|
|
|
+ ContractTestUtils.writeDataset(remoteFS, updatedSource,
|
|
|
+ updatedSourceBlock, updatedSourceBlock.length, 1024, true);
|
|
|
+
|
|
|
+ // For testing purposes we would take the modification time of the
|
|
|
+ // updated Source file and add an offset or subtract the offset and set
|
|
|
+ // that time as the modification time for target file, this way we can
|
|
|
+ // ensure that our test can emulate a scenario where source is either more
|
|
|
+ // recently changed after -update so that copy takes place or target file
|
|
|
+ // is more recently changed which would skip the copying since the source
|
|
|
+ // has not been recently updated.
|
|
|
+ FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
|
|
|
+ long modTimeSourceUpd = fsSourceUpd.getModificationTime();
|
|
|
+
|
|
|
+ // Add by an offset which would ensure enough gap for the test to
|
|
|
+ // not fail due to race conditions.
|
|
|
+ long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
|
|
|
+ localFS.setTimes(dest, newTargetModTimeNew, -1);
|
|
|
+
|
|
|
+ // Execute the distcp -update job.
|
|
|
+ Job updatedSourceJobOldSrc =
|
|
|
+ distCpUpdateWithFs(remoteDir, localDir, remoteFS,
|
|
|
+ localFS);
|
|
|
+
|
|
|
+ // File contents should remain same since the mod time for target is
|
|
|
+ // newer than the updatedSource which indicates that the sync happened
|
|
|
+ // more recently and there is no update.
|
|
|
+ verifyFileContents(localFS, dest, sourceBlock);
|
|
|
+ // Skipped both 0 byte file and sourceFile (since mod time of target is
|
|
|
+ // older than the source it is perceived that source is of older version
|
|
|
+ // and we can skip it's copy).
|
|
|
+ verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);
|
|
|
+
|
|
|
+ // Subtract by an offset which would ensure enough gap for the test to
|
|
|
+ // not fail due to race conditions.
|
|
|
+ long newTargetModTimeOld =
|
|
|
+ Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
|
|
|
+ localFS.setTimes(dest, newTargetModTimeOld, -1);
|
|
|
+
|
|
|
+ // Execute the distcp -update job.
|
|
|
+ Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
|
|
|
+ remoteFS,
|
|
|
+ localFS);
|
|
|
+
|
|
|
+ // Verifying the target directory have both 0 byte file and the content
|
|
|
+ // file.
|
|
|
+ Assertions
|
|
|
+ .assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
|
|
|
+ .hasSize(2);
|
|
|
+ // Now the copy should take place and the file contents should change
|
|
|
+ // since the mod time for target is older than the source file indicating
|
|
|
+ // that there was an update to the source after the last sync took place.
|
|
|
+ verifyFileContents(localFS, dest, updatedSourceBlock);
|
|
|
+ // Verifying we skipped the 0 byte file and copied the updated source
|
|
|
+ // file (since the modification time of the new source is older than the
|
|
|
+ // target now).
|
|
|
+ verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Method to check the skipped and copied counters of a distcp job.
|
|
|
+ *
|
|
|
+ * @param job job to check.
|
|
|
+ * @param skipExpectedValue expected skip counter value.
|
|
|
+ * @param copyExpectedValue expected copy counter value.
|
|
|
+ * @throws IOException throw in case of failures.
|
|
|
+ */
|
|
|
+ private void verifySkipAndCopyCounter(Job job,
|
|
|
+ int skipExpectedValue, int copyExpectedValue) throws IOException {
|
|
|
+ // get the skip and copy counters from the job.
|
|
|
+ long skipActualValue = job.getCounters()
|
|
|
+ .findCounter(CopyMapper.Counter.SKIP).getValue();
|
|
|
+ long copyActualValue = job.getCounters()
|
|
|
+ .findCounter(CopyMapper.Counter.COPY).getValue();
|
|
|
+ // Verify if the actual values equals the expected ones.
|
|
|
+ assertEquals("Mismatch in COPY counter value", copyExpectedValue,
|
|
|
+ copyActualValue);
|
|
|
+ assertEquals("Mismatch in SKIP counter value", skipExpectedValue,
|
|
|
+ skipActualValue);
|
|
|
+ }
|
|
|
}
|