|
@@ -18,12 +18,17 @@
|
|
|
package org.apache.hadoop.mapred.lib;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.util.List;
|
|
|
+import java.util.zip.GZIPOutputStream;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -32,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
+import org.apache.hadoop.mapred.FileInputFormat;
|
|
|
import org.apache.hadoop.mapred.InputSplit;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
@@ -64,9 +70,11 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
final Path dir2 = new Path(inDir, "/dir2");
|
|
|
final Path dir3 = new Path(inDir, "/dir3");
|
|
|
final Path dir4 = new Path(inDir, "/dir4");
|
|
|
+ final Path dir5 = new Path(inDir, "/dir5");
|
|
|
|
|
|
static final int BLOCKSIZE = 1024;
|
|
|
static final byte[] databuf = new byte[BLOCKSIZE];
|
|
|
+ private static final String DUMMY_FS_URI = "dummyfs:///";
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
|
|
|
|
|
@@ -78,6 +86,24 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /** Dummy class to extend CombineFileInputFormat. It allows
|
|
|
+ * non-existent files to be passed into the CombineFileInputFormat, allows
|
|
|
+ * for easy testing without having to create real files.
|
|
|
+ */
|
|
|
+ private class DummyInputFormat1 extends DummyInputFormat {
|
|
|
+ @Override
|
|
|
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
|
|
|
+ Path[] files = getInputPaths(job);
|
|
|
+ FileStatus[] results = new FileStatus[files.length];
|
|
|
+ for (int i = 0; i < files.length; i++) {
|
|
|
+ Path p = files[i];
|
|
|
+ FileSystem fs = p.getFileSystem(job);
|
|
|
+ results[i] = fs.getFileStatus(p);
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public void testSplitPlacement() throws IOException {
|
|
|
String namenode = null;
|
|
@@ -86,16 +112,16 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
FileSystem fileSys = null;
|
|
|
String testName = "TestSplitPlacement";
|
|
|
try {
|
|
|
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
|
|
|
- * 1) file1, just after starting the datanode on r1, with
|
|
|
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
|
|
|
+ * 1) file1 and file5, just after starting the datanode on r1, with
|
|
|
* a repl factor of 1, and,
|
|
|
* 2) file2, just after starting the datanode on r2, with
|
|
|
* a repl factor of 2, and,
|
|
|
- * 3) file3 after starting the all three datanodes, with a repl
|
|
|
+ * 3) file3, file4 after starting the all three datanodes, with a repl
|
|
|
* factor of 3.
|
|
|
- * At the end, file1 will be present on only datanode1, file2 will be
|
|
|
- * present on datanode 1 and datanode2 and
|
|
|
- * file3 will be present on all datanodes.
|
|
|
+ * At the end, file1, file5 will be present on only datanode1, file2 will
|
|
|
+ * be present on datanode 1 and datanode2 and
|
|
|
+ * file3, file4 will be present on all datanodes.
|
|
|
*/
|
|
|
JobConf conf = new JobConf();
|
|
|
conf.setBoolean("dfs.replication.considerLoad", false);
|
|
@@ -111,6 +137,30 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
}
|
|
|
Path file1 = new Path(dir1 + "/file1");
|
|
|
writeFile(conf, file1, (short)1, 1);
|
|
|
+ // create another file on the same datanode
|
|
|
+ Path file5 = new Path(dir5 + "/file5");
|
|
|
+ writeFile(conf, file5, (short)1, 1);
|
|
|
+ // split it using a CombinedFile input format
|
|
|
+ DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
+ JobConf job = new JobConf(conf);
|
|
|
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
|
|
|
+ InputSplit[] splits = inFormat.getSplits(job, 1);
|
|
|
+ System.out.println("Made splits(Test0): " + splits.length);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test0): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(splits.length, 1);
|
|
|
+ CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(1));
|
|
|
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
+
|
|
|
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
|
|
|
dfs.waitActive();
|
|
|
|
|
@@ -119,14 +169,14 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
writeFile(conf, file2, (short)2, 2);
|
|
|
|
|
|
// split it using a CombinedFile input format
|
|
|
- DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
inFormat.setInputPaths(conf, dir1 + "," + dir2);
|
|
|
inFormat.setMinSplitSizeRack(BLOCKSIZE);
|
|
|
- InputSplit[] splits = inFormat.getSplits(conf, 1);
|
|
|
+ splits = inFormat.getSplits(conf, 1);
|
|
|
System.out.println("Made splits(Test1): " + splits.length);
|
|
|
|
|
|
// make sure that each split has different locations
|
|
|
- CombineFileSplit fileSplit = null;
|
|
|
+ fileSplit = null;
|
|
|
for (int i = 0; i < splits.length; ++i) {
|
|
|
fileSplit = (CombineFileSplit) splits[i];
|
|
|
System.out.println("File split(Test1): " + fileSplit);
|
|
@@ -436,7 +486,7 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
static void writeFile(Configuration conf, Path name,
|
|
|
short replication, int numBlocks) throws IOException {
|
|
|
FileSystem fileSys = FileSystem.get(conf);
|
|
@@ -444,12 +494,409 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
FSDataOutputStream stm = fileSys.create(name, true,
|
|
|
conf.getInt("io.file.buffer.size", 4096),
|
|
|
replication, (long)BLOCKSIZE);
|
|
|
+ writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Creates the gzip file and return the FileStatus
|
|
|
+ static FileStatus writeGzipFile(Configuration conf, Path name,
|
|
|
+ short replication, int numBlocks) throws IOException {
|
|
|
+ FileSystem fileSys = FileSystem.get(conf);
|
|
|
+
|
|
|
+ GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
|
|
|
+ .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
|
|
|
+ writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
|
|
|
+ return fileSys.getFileStatus(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
|
|
|
+ OutputStream out, short replication, int numBlocks) throws IOException {
|
|
|
for (int i = 0; i < numBlocks; i++) {
|
|
|
- stm.write(databuf);
|
|
|
+ out.write(databuf);
|
|
|
}
|
|
|
- stm.close();
|
|
|
+ out.close();
|
|
|
DFSTestUtil.waitReplication(fileSys, name, replication);
|
|
|
}
|
|
|
+
|
|
|
+ public void testSplitPlacementForCompressedFiles() throws IOException {
|
|
|
+ MiniDFSCluster dfs = null;
|
|
|
+ FileSystem fileSys = null;
|
|
|
+ try {
|
|
|
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
|
|
|
+ * files
|
|
|
+ * 1) file1 and file5, just after starting the datanode on r1, with
|
|
|
+ * a repl factor of 1, and,
|
|
|
+ * 2) file2, just after starting the datanode on r2, with
|
|
|
+ * a repl factor of 2, and,
|
|
|
+ * 3) file3, file4 after starting the all three datanodes, with a repl
|
|
|
+ * factor of 3.
|
|
|
+ * At the end, file1, file5 will be present on only datanode1, file2 will
|
|
|
+ * be present on datanode 1 and datanode2 and
|
|
|
+ * file3, file4 will be present on all datanodes.
|
|
|
+ */
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean("dfs.replication.considerLoad", false);
|
|
|
+ dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
|
|
|
+ dfs.waitActive();
|
|
|
+
|
|
|
+ fileSys = dfs.getFileSystem();
|
|
|
+ if (!fileSys.mkdirs(inDir)) {
|
|
|
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
|
|
|
+ }
|
|
|
+ Path file1 = new Path(dir1 + "/file1.gz");
|
|
|
+ FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
|
|
|
+ // create another file on the same datanode
|
|
|
+ Path file5 = new Path(dir5 + "/file5.gz");
|
|
|
+ FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
|
|
|
+ // split it using a CombinedFile input format
|
|
|
+ DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
+ JobConf job = new JobConf(conf);
|
|
|
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
|
|
|
+ InputSplit[] splits = inFormat.getSplits(job, 1);
|
|
|
+ System.out.println("Made splits(Test0): " + splits.length);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test0): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(splits.length, 1);
|
|
|
+ CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(1));
|
|
|
+ assertEquals(f5.getLen(), fileSplit.getLength(1));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
+
|
|
|
+ dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
|
|
|
+ dfs.waitActive();
|
|
|
+
|
|
|
+ // create file on two datanodes.
|
|
|
+ Path file2 = new Path(dir2 + "/file2.gz");
|
|
|
+ FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
|
|
|
+
|
|
|
+ // split it using a CombinedFile input format
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
|
|
|
+ inFormat.setMinSplitSizeRack(f1.getLen());
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ System.out.println("Made splits(Test1): " + splits.length);
|
|
|
+
|
|
|
+ // make sure that each split has different locations
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test1): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(2, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+
|
|
|
+ // create another file on 3 datanodes and 3 racks.
|
|
|
+ dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
|
|
|
+ dfs.waitActive();
|
|
|
+ Path file3 = new Path(dir3 + "/file3.gz");
|
|
|
+ FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
|
|
|
+ inFormat.setMinSplitSizeRack(f1.getLen());
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test2): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(3, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ fileSplit = (CombineFileSplit) splits[2];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+
|
|
|
+ // create file4 on all three racks
|
|
|
+ Path file4 = new Path(dir4 + "/file4.gz");
|
|
|
+ FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ FileInputFormat.setInputPaths(job,
|
|
|
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
+ inFormat.setMinSplitSizeRack(f1.getLen());
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test3): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(3, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(1));
|
|
|
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ fileSplit = (CombineFileSplit) splits[2];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+
|
|
|
+ // maximum split size is file1's length
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ inFormat.setMinSplitSizeNode(f1.getLen());
|
|
|
+ inFormat.setMaxSplitSize(f1.getLen());
|
|
|
+ FileInputFormat.setInputPaths(job,
|
|
|
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test4): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(4, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f4.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
+ fileSplit = (CombineFileSplit) splits[2];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ fileSplit = (CombineFileSplit) splits[3];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+
|
|
|
+ // maximum split size is twice file1's length
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ inFormat.setMinSplitSizeNode(f1.getLen());
|
|
|
+ inFormat.setMaxSplitSize(2 * f1.getLen());
|
|
|
+ FileInputFormat.setInputPaths(job,
|
|
|
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test5): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(3, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(1));
|
|
|
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ fileSplit = (CombineFileSplit) splits[2];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+
|
|
|
+ // maximum split size is 4 times file1's length
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ inFormat.setMinSplitSizeNode(2 * f1.getLen());
|
|
|
+ inFormat.setMaxSplitSize(4 * f1.getLen());
|
|
|
+ FileInputFormat.setInputPaths(job,
|
|
|
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test6): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(2, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(1));
|
|
|
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(1));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+
|
|
|
+ // maximum split size and min-split-size per rack is 4 times file1's length
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ inFormat.setMaxSplitSize(4 * f1.getLen());
|
|
|
+ inFormat.setMinSplitSizeRack(4 * f1.getLen());
|
|
|
+ FileInputFormat.setInputPaths(job,
|
|
|
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test7): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(1, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(4, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
+
|
|
|
+ // minimum split size per node is 4 times file1's length
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ inFormat.setMinSplitSizeNode(4 * f1.getLen());
|
|
|
+ FileInputFormat.setInputPaths(job,
|
|
|
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test8): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(1, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(4, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
+
|
|
|
+ // Rack 1 has file1, file2 and file3 and file4
|
|
|
+ // Rack 2 has file2 and file3 and file4
|
|
|
+ // Rack 3 has file3 and file4
|
|
|
+ // setup a filter so that only file1 and file2 can be combined
|
|
|
+ inFormat = new DummyInputFormat();
|
|
|
+ FileInputFormat.addInputPath(job, inDir);
|
|
|
+ inFormat.setMinSplitSizeRack(1); // everything is at least rack local
|
|
|
+ inFormat.createPool(job, new TestFilter(dir1),
|
|
|
+ new TestFilter(dir2));
|
|
|
+ splits = inFormat.getSplits(job, 1);
|
|
|
+ for (InputSplit split : splits) {
|
|
|
+ System.out.println("File split(Test9): " + split);
|
|
|
+ }
|
|
|
+ assertEquals(3, splits.length);
|
|
|
+ fileSplit = (CombineFileSplit) splits[0];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ fileSplit = (CombineFileSplit) splits[1];
|
|
|
+ assertEquals(1, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+ fileSplit = (CombineFileSplit) splits[2];
|
|
|
+ assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(1, fileSplit.getLocations().length);
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
+
|
|
|
+ // measure performance when there are multiple pools and
|
|
|
+ // many files in each pool.
|
|
|
+ int numPools = 100;
|
|
|
+ int numFiles = 1000;
|
|
|
+ DummyInputFormat1 inFormat1 = new DummyInputFormat1();
|
|
|
+ for (int i = 0; i < numFiles; i++) {
|
|
|
+ FileInputFormat.setInputPaths(job, file1);
|
|
|
+ }
|
|
|
+ inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
|
|
|
+ final Path dirNoMatch1 = new Path(inDir, "/dirxx");
|
|
|
+ final Path dirNoMatch2 = new Path(inDir, "/diryy");
|
|
|
+ for (int i = 0; i < numPools; i++) {
|
|
|
+ inFormat1.createPool(job, new TestFilter(dirNoMatch1),
|
|
|
+ new TestFilter(dirNoMatch2));
|
|
|
+ }
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ splits = inFormat1.getSplits(job, 1);
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
+ System.out.println("Elapsed time for " + numPools + " pools " +
|
|
|
+ " and " + numFiles + " files is " +
|
|
|
+ ((end - start)) + " milli seconds.");
|
|
|
+ } finally {
|
|
|
+ if (dfs != null) {
|
|
|
+ dfs.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test when input files are from non-default file systems
|
|
|
+ */
|
|
|
+ public void testForNonDefaultFileSystem() throws Throwable {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ // use a fake file system scheme as default
|
|
|
+ conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
|
|
|
+
|
|
|
+ // default fs path
|
|
|
+ assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
|
|
|
+ // add a local file
|
|
|
+ Path localPath = new Path("testFile1");
|
|
|
+ FileSystem lfs = FileSystem.getLocal(conf);
|
|
|
+ FSDataOutputStream dos = lfs.create(localPath);
|
|
|
+ dos.writeChars("Local file for CFIF");
|
|
|
+ dos.close();
|
|
|
+
|
|
|
+ conf.set("mapred.working.dir", "/");
|
|
|
+ JobConf job = new JobConf(conf);
|
|
|
+
|
|
|
+ FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
|
|
|
+ DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
+ InputSplit[] splits = inFormat.getSplits(job, 1);
|
|
|
+ assertTrue(splits.length > 0);
|
|
|
+ for (InputSplit s : splits) {
|
|
|
+ CombineFileSplit cfs = (CombineFileSplit)s;
|
|
|
+ for (Path p : cfs.getPaths()) {
|
|
|
+ assertEquals(p.toUri().getScheme(), "file");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
static class TestFilter implements PathFilter {
|
|
|
private Path p;
|
|
@@ -462,7 +909,7 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
// returns true if the specified path matches the prefix stored
|
|
|
// in this TestFilter.
|
|
|
public boolean accept(Path path) {
|
|
|
- if (path.toString().indexOf(p.toString()) == 0) {
|
|
|
+ if (path.toUri().getPath().indexOf(p.toString()) == 0) {
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|