|
@@ -25,6 +25,9 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
@@ -33,6 +36,7 @@ import org.apache.hadoop.mapreduce.*;
|
|
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
|
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.tools.CopyListing;
|
|
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
|
|
import org.apache.hadoop.tools.DistCpConstants;
|
|
@@ -40,6 +44,7 @@ import org.apache.hadoop.tools.DistCpContext;
|
|
|
import org.apache.hadoop.tools.DistCpOptions;
|
|
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
|
|
import org.apache.hadoop.tools.GlobbedCopyListing;
|
|
|
+import org.apache.hadoop.tools.util.DistCpUtils;
|
|
|
import org.apache.hadoop.tools.util.TestDistCpUtils;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
import org.junit.*;
|
|
@@ -55,13 +60,16 @@ public class TestCopyCommitter {
|
|
|
|
|
|
private static final Random rand = new Random();
|
|
|
|
|
|
+ private static final long BLOCK_SIZE = 1024;
|
|
|
private static final Credentials CREDENTIALS = new Credentials();
|
|
|
public static final int PORT = 39737;
|
|
|
|
|
|
|
|
|
- private static Configuration config;
|
|
|
+ private static Configuration clusterConfig;
|
|
|
private static MiniDFSCluster cluster;
|
|
|
|
|
|
+ private Configuration config;
|
|
|
+
|
|
|
private static Job getJobForClient() throws IOException {
|
|
|
Job job = Job.getInstance(new Configuration());
|
|
|
job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
|
|
@@ -73,10 +81,17 @@ public class TestCopyCommitter {
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void create() throws IOException {
|
|
|
- config = getJobForClient().getConfiguration();
|
|
|
- config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
|
|
|
- cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
|
|
|
- .build();
|
|
|
+ clusterConfig = getJobForClient().getConfiguration();
|
|
|
+ clusterConfig.setLong(
|
|
|
+ DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
|
|
|
+ clusterConfig.setLong(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ clusterConfig.setLong(
|
|
|
+ DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ cluster = new MiniDFSCluster.Builder(clusterConfig)
|
|
|
+ .numDataNodes(1)
|
|
|
+ .format(true)
|
|
|
+ .build();
|
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
@@ -88,6 +103,7 @@ public class TestCopyCommitter {
|
|
|
|
|
|
@Before
|
|
|
public void createMetaFolder() throws IOException {
|
|
|
+ config = new Configuration(clusterConfig);
|
|
|
config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
|
|
|
Path meta = new Path("/meta");
|
|
|
cluster.getFileSystem().mkdirs(meta);
|
|
@@ -397,6 +413,141 @@ public class TestCopyCommitter {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCommitWithChecksumMismatchAndSkipCrc() throws IOException {
|
|
|
+ testCommitWithChecksumMismatch(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCommitWithChecksumMismatchWithoutSkipCrc()
|
|
|
+ throws IOException {
|
|
|
+ testCommitWithChecksumMismatch(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testCommitWithChecksumMismatch(boolean skipCrc)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
|
|
|
+ JobContext jobContext = new JobContextImpl(
|
|
|
+ taskAttemptContext.getConfiguration(),
|
|
|
+ taskAttemptContext.getTaskAttemptID().getJobID());
|
|
|
+ Configuration conf = jobContext.getConfiguration();
|
|
|
+
|
|
|
+ String sourceBase;
|
|
|
+ String targetBase;
|
|
|
+ FileSystem fs = null;
|
|
|
+ try {
|
|
|
+ fs = FileSystem.get(conf);
|
|
|
+ sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
|
|
|
+ targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
|
|
|
+
|
|
|
+ int blocksPerChunk = 5;
|
|
|
+ String srcFilename = "/srcdata";
|
|
|
+ createSrcAndWorkFilesWithDifferentChecksum(fs, targetBase, sourceBase,
|
|
|
+ srcFilename, blocksPerChunk);
|
|
|
+
|
|
|
+ DistCpOptions options = new DistCpOptions.Builder(
|
|
|
+ Collections.singletonList(new Path(sourceBase)),
|
|
|
+ new Path("/out"))
|
|
|
+ .withBlocksPerChunk(blocksPerChunk)
|
|
|
+ .withCRC(skipCrc)
|
|
|
+ .build();
|
|
|
+ options.appendToConf(conf);
|
|
|
+ conf.setBoolean(
|
|
|
+ DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
|
|
|
+ DistCpContext context = new DistCpContext(options);
|
|
|
+ context.setTargetPathExists(false);
|
|
|
+
|
|
|
+ CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
|
|
|
+ Path listingFile = new Path("/tmp1/"
|
|
|
+ + String.valueOf(rand.nextLong()));
|
|
|
+ listing.buildListing(listingFile, context);
|
|
|
+
|
|
|
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
|
|
|
+ conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
|
|
|
+
|
|
|
+ OutputCommitter committer = new CopyCommitter(
|
|
|
+ null, taskAttemptContext);
|
|
|
+ try {
|
|
|
+ committer.commitJob(jobContext);
|
|
|
+ if (!skipCrc) {
|
|
|
+ Assert.fail("Expected commit to fail");
|
|
|
+ }
|
|
|
+ Assert.assertFalse(DistCpUtils.checksumsAreEqual(
|
|
|
+ fs, new Path(sourceBase + srcFilename), null,
|
|
|
+ fs, new Path(targetBase + srcFilename)));
|
|
|
+ } catch(IOException exception) {
|
|
|
+ if (skipCrc) {
|
|
|
+ LOG.error("Unexpected exception is found", exception);
|
|
|
+ throw exception;
|
|
|
+ }
|
|
|
+ Throwable cause = exception.getCause();
|
|
|
+ GenericTestUtils.assertExceptionContains(
|
|
|
+ "Checksum mismatch", cause);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ TestDistCpUtils.delete(fs, "/tmp1");
|
|
|
+ TestDistCpUtils.delete(fs, "/meta");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a source file and its DistCp working files with different checksum
|
|
|
+ * to test the checksum validation for copying blocks in parallel.
|
|
|
+ *
|
|
|
+ * For the ease of construction, it assumes a source file can be broken down
|
|
|
+ * into 2 working files (or 2 chunks).
|
|
|
+ *
|
|
|
+ * So for a source file with length =
|
|
|
+ * BLOCK_SIZE * blocksPerChunk + BLOCK_SIZE / 2,
|
|
|
+ * its 1st working file will have length =
|
|
|
+ * BLOCK_SIZE * blocksPerChunk,
|
|
|
+ * then the 2nd working file will have length =
|
|
|
+ * BLOCK_SIZE / 2.
|
|
|
+ * And the working files are generated with a different seed to mimic
|
|
|
+ * same length but different checksum scenario.
|
|
|
+ *
|
|
|
+ * @param fs the FileSystem
|
|
|
+ * @param targetBase the path to the working files
|
|
|
+ * @param sourceBase the path to a source file
|
|
|
+ * @param filename the filename to copy and work on
|
|
|
+ * @param blocksPerChunk the blocks per chunk config that enables copying
|
|
|
+ * blocks in parallel
|
|
|
+ * @throws IOException when it fails to create files
|
|
|
+ */
|
|
|
+ private void createSrcAndWorkFilesWithDifferentChecksum(FileSystem fs,
|
|
|
+ String targetBase,
|
|
|
+ String sourceBase,
|
|
|
+ String filename,
|
|
|
+ int blocksPerChunk)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ long srcSeed = System.currentTimeMillis();
|
|
|
+ long dstSeed = srcSeed + rand.nextLong();
|
|
|
+ int bufferLen = 128;
|
|
|
+ short replFactor = 2;
|
|
|
+ Path srcData = new Path(sourceBase + filename);
|
|
|
+
|
|
|
+ // create data with 2 chunks: the 2nd chunk has half of the block size
|
|
|
+ long firstChunkLength = BLOCK_SIZE * blocksPerChunk;
|
|
|
+ long secondChunkLength = BLOCK_SIZE / 2;
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(fs, srcData,
|
|
|
+ bufferLen, firstChunkLength, BLOCK_SIZE, replFactor,
|
|
|
+ srcSeed);
|
|
|
+ DFSTestUtil.appendFileNewBlock((DistributedFileSystem) fs, srcData,
|
|
|
+ (int) secondChunkLength);
|
|
|
+
|
|
|
+ DFSTestUtil.createFile(fs, new Path(targetBase
|
|
|
+ + filename + ".____distcpSplit____0."
|
|
|
+ + firstChunkLength), bufferLen,
|
|
|
+ firstChunkLength, BLOCK_SIZE, replFactor, dstSeed);
|
|
|
+ DFSTestUtil.createFile(fs, new Path(targetBase
|
|
|
+ + filename + ".____distcpSplit____"
|
|
|
+ + firstChunkLength + "." + secondChunkLength), bufferLen,
|
|
|
+ secondChunkLength, BLOCK_SIZE, replFactor, dstSeed);
|
|
|
+ }
|
|
|
+
|
|
|
private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
|
|
|
return new TaskAttemptContextImpl(conf,
|
|
|
new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
|