|
@@ -21,6 +21,7 @@ package org.apache.hadoop.tools.mapred;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.BlockLocation;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -53,7 +54,7 @@ public class TestCopyMapper {
|
|
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
|
|
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
|
|
private static List<Path> pathList = new ArrayList<Path>();
|
|
private static List<Path> pathList = new ArrayList<Path>();
|
|
private static int nFiles = 0;
|
|
private static int nFiles = 0;
|
|
- private static final int FILE_SIZE = 1024;
|
|
|
|
|
|
+ private static final int DEFAULT_FILE_SIZE = 1024;
|
|
|
|
|
|
private static MiniDFSCluster cluster;
|
|
private static MiniDFSCluster cluster;
|
|
|
|
|
|
@@ -92,7 +93,7 @@ public class TestCopyMapper {
|
|
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
|
|
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
|
|
false);
|
|
false);
|
|
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
|
|
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
|
|
- true);
|
|
|
|
|
|
+ false);
|
|
configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
|
|
configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
|
|
true);
|
|
true);
|
|
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
|
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
|
@@ -112,6 +113,18 @@ public class TestCopyMapper {
|
|
touchFile(SOURCE_PATH + "/7/8/9");
|
|
touchFile(SOURCE_PATH + "/7/8/9");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static void createSourceDataWithDifferentBlockSize() throws Exception {
|
|
|
|
+ mkdirs(SOURCE_PATH + "/1");
|
|
|
|
+ mkdirs(SOURCE_PATH + "/2");
|
|
|
|
+ mkdirs(SOURCE_PATH + "/2/3/4");
|
|
|
|
+ mkdirs(SOURCE_PATH + "/2/3");
|
|
|
|
+ mkdirs(SOURCE_PATH + "/5");
|
|
|
|
+ touchFile(SOURCE_PATH + "/5/6", true);
|
|
|
|
+ mkdirs(SOURCE_PATH + "/7");
|
|
|
|
+ mkdirs(SOURCE_PATH + "/7/8");
|
|
|
|
+ touchFile(SOURCE_PATH + "/7/8/9");
|
|
|
|
+ }
|
|
|
|
+
|
|
private static void mkdirs(String path) throws Exception {
|
|
private static void mkdirs(String path) throws Exception {
|
|
FileSystem fileSystem = cluster.getFileSystem();
|
|
FileSystem fileSystem = cluster.getFileSystem();
|
|
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
|
|
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
|
|
@@ -121,17 +134,31 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
|
|
|
|
private static void touchFile(String path) throws Exception {
|
|
private static void touchFile(String path) throws Exception {
|
|
|
|
+ touchFile(path, false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void touchFile(String path, boolean createMultipleBlocks) throws Exception {
|
|
|
|
+ final long NON_DEFAULT_BLOCK_SIZE = 4096;
|
|
FileSystem fs;
|
|
FileSystem fs;
|
|
DataOutputStream outputStream = null;
|
|
DataOutputStream outputStream = null;
|
|
try {
|
|
try {
|
|
fs = cluster.getFileSystem();
|
|
fs = cluster.getFileSystem();
|
|
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
|
|
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
|
|
fs.getWorkingDirectory());
|
|
fs.getWorkingDirectory());
|
|
- final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2;
|
|
|
|
|
|
+ final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2;
|
|
outputStream = fs.create(qualifiedPath, true, 0,
|
|
outputStream = fs.create(qualifiedPath, true, 0,
|
|
(short)(fs.getDefaultReplication(qualifiedPath)*2),
|
|
(short)(fs.getDefaultReplication(qualifiedPath)*2),
|
|
blockSize);
|
|
blockSize);
|
|
- outputStream.write(new byte[FILE_SIZE]);
|
|
|
|
|
|
+ byte[] bytes = new byte[DEFAULT_FILE_SIZE];
|
|
|
|
+ outputStream.write(bytes);
|
|
|
|
+ long fileSize = DEFAULT_FILE_SIZE;
|
|
|
|
+ if (createMultipleBlocks) {
|
|
|
|
+ while (fileSize < 2*blockSize) {
|
|
|
|
+ outputStream.write(bytes);
|
|
|
|
+ outputStream.flush();
|
|
|
|
+ fileSize += DEFAULT_FILE_SIZE;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
pathList.add(qualifiedPath);
|
|
pathList.add(qualifiedPath);
|
|
++nFiles;
|
|
++nFiles;
|
|
|
|
|
|
@@ -144,7 +171,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testRun() {
|
|
public void testRun() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -179,7 +206,7 @@ public class TestCopyMapper {
|
|
|
|
|
|
Assert.assertEquals(pathList.size(),
|
|
Assert.assertEquals(pathList.size(),
|
|
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
|
|
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
|
|
- Assert.assertEquals(nFiles * FILE_SIZE,
|
|
|
|
|
|
+ Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE,
|
|
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
|
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
|
|
|
|
|
testCopyingExistingFiles(fs, copyMapper, context);
|
|
testCopyingExistingFiles(fs, copyMapper, context);
|
|
@@ -211,7 +238,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testMakeDirFailure() {
|
|
public void testMakeDirFailure() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -239,13 +266,13 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testIgnoreFailures() {
|
|
public void testIgnoreFailures() {
|
|
doTestIgnoreFailures(true);
|
|
doTestIgnoreFailures(true);
|
|
doTestIgnoreFailures(false);
|
|
doTestIgnoreFailures(false);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testDirToFile() {
|
|
public void testDirToFile() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -273,7 +300,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testPreserve() {
|
|
public void testPreserve() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -343,7 +370,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testCopyReadableFiles() {
|
|
public void testCopyReadableFiles() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -406,7 +433,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testSkipCopyNoPerms() {
|
|
public void testSkipCopyNoPerms() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -480,7 +507,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testFailCopyWithAccessControlException() {
|
|
public void testFailCopyWithAccessControlException() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -558,7 +585,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testFileToDir() {
|
|
public void testFileToDir() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -635,12 +662,48 @@ public class TestCopyMapper {
|
|
cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
|
|
cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testPreserveBlockSizeAndReplication() {
|
|
public void testPreserveBlockSizeAndReplication() {
|
|
testPreserveBlockSizeAndReplicationImpl(true);
|
|
testPreserveBlockSizeAndReplicationImpl(true);
|
|
testPreserveBlockSizeAndReplicationImpl(false);
|
|
testPreserveBlockSizeAndReplicationImpl(false);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
|
|
+ public void testCopyFailOnBlockSizeDifference() {
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ deleteState();
|
|
|
|
+ createSourceDataWithDifferentBlockSize();
|
|
|
|
+
|
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
|
+ CopyMapper copyMapper = new CopyMapper();
|
|
|
|
+ StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
|
|
|
+ Mapper<Text, FileStatus, Text, Text>.Context context
|
|
|
|
+ = stubContext.getContext();
|
|
|
|
+
|
|
|
|
+ Configuration configuration = context.getConfiguration();
|
|
|
|
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
|
|
|
|
+ = EnumSet.noneOf(DistCpOptions.FileAttribute.class);
|
|
|
|
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
|
|
|
+ DistCpUtils.packAttributes(fileAttributes));
|
|
|
|
+
|
|
|
|
+ copyMapper.setup(context);
|
|
|
|
+
|
|
|
|
+ for (Path path : pathList) {
|
|
|
|
+ final FileStatus fileStatus = fs.getFileStatus(path);
|
|
|
|
+ copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
|
|
|
+ fileStatus, context);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Assert.fail("Copy should have failed because of block-size difference.");
|
|
|
|
+ }
|
|
|
|
+ catch (Exception exception) {
|
|
|
|
+ // Check that the exception suggests the use of -pb/-skipCrc.
|
|
|
|
+ Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getMessage().contains("pb"));
|
|
|
|
+ Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getMessage().contains("skipCrc"));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
|
|
private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
|
|
try {
|
|
try {
|
|
|
|
|
|
@@ -712,7 +775,7 @@ public class TestCopyMapper {
|
|
* If a single file is being copied to a location where the file (of the same
|
|
* If a single file is being copied to a location where the file (of the same
|
|
* name) already exists, then the file shouldn't be skipped.
|
|
* name) already exists, then the file shouldn't be skipped.
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testSingleFileCopy() {
|
|
public void testSingleFileCopy() {
|
|
try {
|
|
try {
|
|
deleteState();
|
|
deleteState();
|
|
@@ -761,7 +824,7 @@ public class TestCopyMapper {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
|
|
+ @Test(timeout=40000)
|
|
public void testPreserveUserGroup() {
|
|
public void testPreserveUserGroup() {
|
|
testPreserveUserGroupImpl(true);
|
|
testPreserveUserGroupImpl(true);
|
|
testPreserveUserGroupImpl(false);
|
|
testPreserveUserGroupImpl(false);
|