|
@@ -23,17 +23,27 @@ import static org.hamcrest.core.Is.is;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
-import java.io.OutputStream;
|
|
|
+import java.io.PrintStream;
|
|
|
import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Random;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FsShell;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
@@ -47,11 +57,15 @@ import org.junit.rules.Timeout;
|
|
|
*/
|
|
|
|
|
|
public class TestDistCpSystem {
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestDistCpSystem.class);
|
|
|
+
|
|
|
@Rule
|
|
|
public Timeout globalTimeout = new Timeout(30000);
|
|
|
|
|
|
private static final String SRCDAT = "srcdat";
|
|
|
private static final String DSTDAT = "dstdat";
|
|
|
+ private static final long BLOCK_SIZE = 1024;
|
|
|
|
|
|
private static MiniDFSCluster cluster;
|
|
|
private static Configuration conf;
|
|
@@ -63,27 +77,76 @@ public class TestDistCpSystem {
|
|
|
this.path = path;
|
|
|
this.isDir = isDir;
|
|
|
}
|
|
|
- String getPath() { return path; }
|
|
|
- boolean isDirectory() { return isDir; }
|
|
|
+
|
|
|
+ String getPath() {
|
|
|
+ return path;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isDirectory() {
|
|
|
+ return isDir;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void beforeClass() throws IOException {
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ }
|
|
|
+
|
|
|
+ @AfterClass
|
|
|
+ public static void afterClass() throws IOException {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static String execCmd(FsShell shell, String... args) throws Exception {
|
|
|
+ ByteArrayOutputStream baout = new ByteArrayOutputStream();
|
|
|
+ PrintStream out = new PrintStream(baout, true);
|
|
|
+ PrintStream old = System.out;
|
|
|
+ System.setOut(out);
|
|
|
+ shell.run(args);
|
|
|
+ out.close();
|
|
|
+ System.setOut(old);
|
|
|
+ return baout.toString();
|
|
|
}
|
|
|
|
|
|
- private void createFiles(FileSystem fs, String topdir,
|
|
|
- FileEntry[] entries) throws IOException {
|
|
|
+ private void createFiles(DistributedFileSystem fs, String topdir,
|
|
|
+ FileEntry[] entries, long chunkSize) throws IOException {
|
|
|
+ long seed = System.currentTimeMillis();
|
|
|
+ Random rand = new Random(seed);
|
|
|
+ short replicationFactor = 2;
|
|
|
for (FileEntry entry : entries) {
|
|
|
- Path newpath = new Path(topdir + "/" + entry.getPath());
|
|
|
+ Path newPath = new Path(topdir + "/" + entry.getPath());
|
|
|
if (entry.isDirectory()) {
|
|
|
- fs.mkdirs(newpath);
|
|
|
+ fs.mkdirs(newPath);
|
|
|
} else {
|
|
|
- OutputStream out = fs.create(newpath);
|
|
|
- try {
|
|
|
- out.write((topdir + "/" + entry).getBytes());
|
|
|
- out.write("\n".getBytes());
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
+ long fileSize = BLOCK_SIZE *100;
|
|
|
+ int bufSize = 128;
|
|
|
+ if (chunkSize == -1) {
|
|
|
+ DFSTestUtil.createFile(fs, newPath, bufSize,
|
|
|
+ fileSize, BLOCK_SIZE, replicationFactor, seed);
|
|
|
+ } else {
|
|
|
+ // Create a variable length block file, by creating
|
|
|
+ // one block of half block size at the chunk boundary
|
|
|
+ long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2;
|
|
|
+ long seg2 = fileSize - seg1;
|
|
|
+ DFSTestUtil.createFile(fs, newPath, bufSize,
|
|
|
+ seg1, BLOCK_SIZE, replicationFactor, seed);
|
|
|
+ DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2);
|
|
|
}
|
|
|
}
|
|
|
+ seed = System.currentTimeMillis() + rand.nextLong();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void createFiles(DistributedFileSystem fs, String topdir,
|
|
|
+ FileEntry[] entries) throws IOException {
|
|
|
+ createFiles(fs, topdir, entries, -1);
|
|
|
+ }
|
|
|
|
|
|
private static FileStatus[] getFileStatus(FileSystem fs,
|
|
|
String topdir, FileEntry[] files) throws IOException {
|
|
@@ -104,18 +167,19 @@ public class TestDistCpSystem {
|
|
|
}
|
|
|
|
|
|
private void testPreserveUserHelper(String testRoot,
|
|
|
- FileEntry[] srcEntries,
|
|
|
- FileEntry[] dstEntries,
|
|
|
- boolean createSrcDir,
|
|
|
- boolean createTgtDir,
|
|
|
- boolean update) throws Exception {
|
|
|
+ FileEntry[] srcEntries,
|
|
|
+ FileEntry[] dstEntries,
|
|
|
+ boolean createSrcDir,
|
|
|
+ boolean createTgtDir,
|
|
|
+ boolean update) throws Exception {
|
|
|
final String testSrcRel = SRCDAT;
|
|
|
final String testSrc = testRoot + "/" + testSrcRel;
|
|
|
final String testDstRel = DSTDAT;
|
|
|
final String testDst = testRoot + "/" + testDstRel;
|
|
|
|
|
|
String nnUri = FileSystem.getDefaultUri(conf).toString();
|
|
|
- FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
|
|
|
+ DistributedFileSystem fs = (DistributedFileSystem)
|
|
|
+ FileSystem.get(URI.create(nnUri), conf);
|
|
|
fs.mkdirs(new Path(testRoot));
|
|
|
if (createSrcDir) {
|
|
|
fs.mkdirs(new Path(testSrc));
|
|
@@ -129,8 +193,8 @@ public class TestDistCpSystem {
|
|
|
for(int i = 0; i < srcEntries.length; i++) {
|
|
|
fs.setOwner(srcstats[i].getPath(), "u" + i, null);
|
|
|
}
|
|
|
- String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc,
|
|
|
- nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst};
|
|
|
+ String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc,
|
|
|
+ nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst};
|
|
|
|
|
|
ToolRunner.run(conf, new DistCp(), args);
|
|
|
|
|
@@ -145,20 +209,263 @@ public class TestDistCpSystem {
|
|
|
deldir(fs, testRoot);
|
|
|
}
|
|
|
|
|
|
- @BeforeClass
|
|
|
- public static void beforeClass() throws IOException {
|
|
|
- conf = new Configuration();
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
|
|
- cluster.waitActive();
|
|
|
+ private void compareFiles(FileSystem fs, FileStatus srcStat,
|
|
|
+ FileStatus dstStat) throws Exception {
|
|
|
+ LOG.info("Comparing " + srcStat + " and " + dstStat);
|
|
|
+ assertEquals(srcStat.isDirectory(), dstStat.isDirectory());
|
|
|
+ assertEquals(srcStat.getReplication(), dstStat.getReplication());
|
|
|
+ assertEquals("File POSIX permission should match",
|
|
|
+ srcStat.getPermission(), dstStat.getPermission());
|
|
|
+ assertEquals("File user ownership should match",
|
|
|
+ srcStat.getOwner(), dstStat.getOwner());
|
|
|
+ assertEquals("File group ownership should match",
|
|
|
+ srcStat.getGroup(), dstStat.getGroup());
|
|
|
+ // TODO; check ACL attributes
|
|
|
+
|
|
|
+ if (srcStat.isDirectory()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals("File length should match (" + srcStat.getPath() + ")",
|
|
|
+ srcStat.getLen(), dstStat.getLen());
|
|
|
+
|
|
|
+ FSDataInputStream srcIn = fs.open(srcStat.getPath());
|
|
|
+ FSDataInputStream dstIn = fs.open(dstStat.getPath());
|
|
|
+ try {
|
|
|
+ byte[] readSrc = new byte[(int)
|
|
|
+ HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
|
|
|
+ byte[] readDst = new byte[(int)
|
|
|
+ HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
|
|
|
+
|
|
|
+ int srcBytesRead = 0, tgtBytesRead = 0;
|
|
|
+ int srcIdx = 0, tgtIdx = 0;
|
|
|
+ long totalComparedBytes = 0;
|
|
|
+ while (true) {
|
|
|
+ if (srcBytesRead == 0) {
|
|
|
+ srcBytesRead = srcIn.read(readSrc);
|
|
|
+ srcIdx = 0;
|
|
|
+ }
|
|
|
+ if (tgtBytesRead == 0) {
|
|
|
+ tgtBytesRead = dstIn.read(readDst);
|
|
|
+ tgtIdx = 0;
|
|
|
+ }
|
|
|
+ if (srcBytesRead == 0 || tgtBytesRead == 0) {
|
|
|
+ LOG.info("______ compared src and dst files for "
|
|
|
+ + totalComparedBytes + " bytes, content match.");
|
|
|
+ if (srcBytesRead != tgtBytesRead) {
|
|
|
+ Assert.fail("Read mismatching size, compared "
|
|
|
+ + totalComparedBytes + " bytes between src and dst file "
|
|
|
+ + srcStat + " and " + dstStat);
|
|
|
+ }
|
|
|
+ if (totalComparedBytes != srcStat.getLen()) {
|
|
|
+ Assert.fail("Only read/compared " + totalComparedBytes +
|
|
|
+ " bytes between src and dst file " + srcStat +
|
|
|
+ " and " + dstStat);
|
|
|
+ } else {
|
|
|
+ // success
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead;
|
|
|
+ ++srcIdx, ++tgtIdx) {
|
|
|
+ if (readSrc[srcIdx] != readDst[tgtIdx]) {
|
|
|
+ Assert.fail("src and dst file does not match at "
|
|
|
+ + totalComparedBytes + " between "
|
|
|
+ + srcStat + " and " + dstStat);
|
|
|
+ }
|
|
|
+ ++totalComparedBytes;
|
|
|
+ }
|
|
|
+ LOG.info("______ compared src and dst files for "
|
|
|
+ + totalComparedBytes + " bytes, content match. FileLength: "
|
|
|
+ + srcStat.getLen());
|
|
|
+ if (totalComparedBytes == srcStat.getLen()) {
|
|
|
+ LOG.info("______ Final:" + srcIdx + " "
|
|
|
+ + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (srcIdx == srcBytesRead) {
|
|
|
+ srcBytesRead = 0;
|
|
|
+ }
|
|
|
+ if (tgtIdx == tgtBytesRead) {
|
|
|
+ tgtBytesRead = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (srcIn != null) {
|
|
|
+ srcIn.close();
|
|
|
+ }
|
|
|
+ if (dstIn != null) {
|
|
|
+ dstIn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- @AfterClass
|
|
|
- public static void afterClass() throws IOException {
|
|
|
- if (cluster != null) {
|
|
|
- cluster.shutdown();
|
|
|
+ // WC: needed because the current distcp does not create target dirs
|
|
|
+ private void createDestDir(FileSystem fs, String testDst,
|
|
|
+ FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException {
|
|
|
+ fs.mkdirs(new Path(testDst));
|
|
|
+
|
|
|
+ for (int i=0; i<srcStats.length; i++) {
|
|
|
+ FileStatus srcStat = srcStats[i];
|
|
|
+ if (srcStat.isDirectory()) {
|
|
|
+ Path dstPath = new Path(testDst, srcFiles[i].getPath());
|
|
|
+ fs.mkdirs(dstPath);
|
|
|
+ fs.setOwner(dstPath, srcStat.getOwner(), srcStat.getGroup());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void copyAndVerify(final DistributedFileSystem fs,
|
|
|
+ final FileEntry[] srcFiles, final FileStatus[] srcStats,
|
|
|
+ final String testDst,
|
|
|
+ final String[] args) throws Exception {
|
|
|
+ final String testRoot = "/testdir";
|
|
|
+ FsShell shell = new FsShell(fs.getConf());
|
|
|
+
|
|
|
+ LOG.info("ls before distcp");
|
|
|
+ LOG.info(execCmd(shell, "-lsr", testRoot));
|
|
|
+
|
|
|
+ LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
|
|
|
+ ToolRunner.run(conf, new DistCp(), args);
|
|
|
+
|
|
|
+ LOG.info("ls after distcp");
|
|
|
+ LOG.info(execCmd(shell, "-lsr", testRoot));
|
|
|
+
|
|
|
+ FileStatus[] dstStat = getFileStatus(fs, testDst, srcFiles);
|
|
|
+ for (int i=0; i< dstStat.length; i++) {
|
|
|
+ compareFiles(fs, srcStats[i], dstStat[i]);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void chunkCopy(FileEntry[] srcFiles) throws Exception {
|
|
|
+ final String testRoot = "/testdir";
|
|
|
+ final String testSrcRel = SRCDAT;
|
|
|
+ final String testSrc = testRoot + "/" + testSrcRel;
|
|
|
+ final String testDstRel = DSTDAT;
|
|
|
+ final String testDst = testRoot + "/" + testDstRel;
|
|
|
+ long chunkSize = 8;
|
|
|
+
|
|
|
+ String nnUri = FileSystem.getDefaultUri(conf).toString();
|
|
|
+ DistributedFileSystem fs = (DistributedFileSystem)
|
|
|
+ FileSystem.get(URI.create(nnUri), conf);
|
|
|
+
|
|
|
+ createFiles(fs, testRoot, srcFiles, chunkSize);
|
|
|
+
|
|
|
+ FileStatus[] srcStats = getFileStatus(fs, testRoot, srcFiles);
|
|
|
+ for (int i = 0; i < srcFiles.length; i++) {
|
|
|
+ fs.setOwner(srcStats[i].getPath(), "u" + i, "g" + i);
|
|
|
+ }
|
|
|
+ // get file status after updating owners
|
|
|
+ srcStats = getFileStatus(fs, testRoot, srcFiles);
|
|
|
+
|
|
|
+ createDestDir(fs, testDst, srcStats, srcFiles);
|
|
|
+
|
|
|
+ String[] args = new String[] {"-pugp", "-blocksperchunk",
|
|
|
+ String.valueOf(chunkSize),
|
|
|
+ nnUri + testSrc, nnUri + testDst};
|
|
|
+
|
|
|
+ copyAndVerify(fs, srcFiles, srcStats, testDst, args);
|
|
|
+ // Do it again
|
|
|
+ copyAndVerify(fs, srcFiles, srcStats, testDst, args);
|
|
|
+
|
|
|
+ // modify last file and rerun distcp with -update option
|
|
|
+ LOG.info("Modify a file and copy again");
|
|
|
+ for(int i=srcFiles.length-1; i >=0; --i) {
|
|
|
+ if (!srcFiles[i].isDirectory()) {
|
|
|
+ LOG.info("Modifying " + srcStats[i].getPath());
|
|
|
+ DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(),
|
|
|
+ (int)BLOCK_SIZE * 3);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // get file status after modifying file
|
|
|
+ srcStats = getFileStatus(fs, testRoot, srcFiles);
|
|
|
+
|
|
|
+ args = new String[] {"-pugp", "-update", "-blocksperchunk",
|
|
|
+ String.valueOf(chunkSize),
|
|
|
+ nnUri + testSrc, nnUri + testDst + "/" + testSrcRel};
|
|
|
+
|
|
|
+ copyAndVerify(fs, srcFiles, srcStats, testDst, args);
|
|
|
+
|
|
|
+ deldir(fs, testRoot);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRecursiveChunkCopy() throws Exception {
|
|
|
+ FileEntry[] srcFiles = {
|
|
|
+ new FileEntry(SRCDAT, true),
|
|
|
+ new FileEntry(SRCDAT + "/file0", false),
|
|
|
+ new FileEntry(SRCDAT + "/dir1", true),
|
|
|
+ new FileEntry(SRCDAT + "/dir2", true),
|
|
|
+ new FileEntry(SRCDAT + "/dir1/file1", false)
|
|
|
+ };
|
|
|
+ chunkCopy(srcFiles);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testChunkCopyOneFile() throws Exception {
|
|
|
+ FileEntry[] srcFiles = {
|
|
|
+ new FileEntry(SRCDAT, true),
|
|
|
+ new FileEntry(SRCDAT + "/file0", false)
|
|
|
+ };
|
|
|
+ chunkCopy(srcFiles);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDistcpLargeFile() throws Exception {
|
|
|
+ FileEntry[] srcfiles = {
|
|
|
+ new FileEntry(SRCDAT, true),
|
|
|
+ new FileEntry(SRCDAT + "/file", false)
|
|
|
+ };
|
|
|
+
|
|
|
+ final String testRoot = "/testdir";
|
|
|
+ final String testSrcRel = SRCDAT;
|
|
|
+ final String testSrc = testRoot + "/" + testSrcRel;
|
|
|
+ final String testDstRel = DSTDAT;
|
|
|
+ final String testDst = testRoot + "/" + testDstRel;
|
|
|
+
|
|
|
+ String nnUri = FileSystem.getDefaultUri(conf).toString();
|
|
|
+ DistributedFileSystem fs =
|
|
|
+ (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
|
|
|
+ fs.mkdirs(new Path(testRoot));
|
|
|
+ fs.mkdirs(new Path(testSrc));
|
|
|
+ fs.mkdirs(new Path(testDst));
|
|
|
+ long chunkSize = 6;
|
|
|
+ createFiles(fs, testRoot, srcfiles, chunkSize);
|
|
|
+
|
|
|
+ String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath();
|
|
|
+ Path srcfile = new Path(srcFileName);
|
|
|
+
|
|
|
+ if(!cluster.getFileSystem().exists(srcfile)){
|
|
|
+ throw new Exception("src not exist");
|
|
|
+ }
|
|
|
+
|
|
|
+ final long srcLen = fs.getFileStatus(srcfile).getLen();
|
|
|
+
|
|
|
+ FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles);
|
|
|
+ for (int i = 0; i < srcfiles.length; i++) {
|
|
|
+ fs.setOwner(srcstats[i].getPath(), "u" + i, null);
|
|
|
+ }
|
|
|
+ String[] args = new String[] {
|
|
|
+ "-blocksperchunk",
|
|
|
+ String.valueOf(chunkSize),
|
|
|
+ nnUri + testSrc,
|
|
|
+ nnUri + testDst
|
|
|
+ };
|
|
|
+
|
|
|
+ LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
|
|
|
+ ToolRunner.run(conf, new DistCp(), args);
|
|
|
+
|
|
|
+ String realTgtPath = testDst;
|
|
|
+ FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles);
|
|
|
+ assertEquals("File length should match", srcLen,
|
|
|
+ dststat[dststat.length - 1].getLen());
|
|
|
+
|
|
|
+ this.compareFiles(fs, srcstats[srcstats.length-1],
|
|
|
+ dststat[dststat.length-1]);
|
|
|
+ deldir(fs, testRoot);
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testPreserveUseNonEmptyDir() throws Exception {
|
|
|
String testRoot = "/testdir." + getMethodName();
|
|
@@ -180,7 +487,6 @@ public class TestDistCpSystem {
|
|
|
testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Test
|
|
|
public void testPreserveUserEmptyDir() throws Exception {
|
|
|
String testRoot = "/testdir." + getMethodName();
|