|
@@ -18,17 +18,12 @@
|
|
|
package org.apache.hadoop.mapred.lib;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.io.OutputStream;
|
|
|
-import java.util.List;
|
|
|
-import java.util.zip.GZIPOutputStream;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -37,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
-import org.apache.hadoop.mapred.FileInputFormat;
|
|
|
import org.apache.hadoop.mapred.InputSplit;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
@@ -70,11 +64,9 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
final Path dir2 = new Path(inDir, "/dir2");
|
|
|
final Path dir3 = new Path(inDir, "/dir3");
|
|
|
final Path dir4 = new Path(inDir, "/dir4");
|
|
|
- final Path dir5 = new Path(inDir, "/dir5");
|
|
|
|
|
|
static final int BLOCKSIZE = 1024;
|
|
|
static final byte[] databuf = new byte[BLOCKSIZE];
|
|
|
- private static final String DUMMY_FS_URI = "dummyfs:///";
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
|
|
|
|
|
@@ -86,24 +78,6 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /** Dummy class to extend CombineFileInputFormat. It allows
|
|
|
- * non-existent files to be passed into the CombineFileInputFormat, allows
|
|
|
- * for easy testing without having to create real files.
|
|
|
- */
|
|
|
- private class DummyInputFormat1 extends DummyInputFormat {
|
|
|
- @Override
|
|
|
- protected FileStatus[] listStatus(JobConf job) throws IOException {
|
|
|
- Path[] files = getInputPaths(job);
|
|
|
- FileStatus[] results = new FileStatus[files.length];
|
|
|
- for (int i = 0; i < files.length; i++) {
|
|
|
- Path p = files[i];
|
|
|
- FileSystem fs = p.getFileSystem(job);
|
|
|
- results[i] = fs.getFileStatus(p);
|
|
|
- }
|
|
|
- return results;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
public void testSplitPlacement() throws IOException {
|
|
|
String namenode = null;
|
|
@@ -112,16 +86,16 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
FileSystem fileSys = null;
|
|
|
String testName = "TestSplitPlacement";
|
|
|
try {
|
|
|
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
|
|
|
- * 1) file1 and file5, just after starting the datanode on r1, with
|
|
|
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
|
|
|
+ * 1) file1, just after starting the datanode on r1, with
|
|
|
* a repl factor of 1, and,
|
|
|
* 2) file2, just after starting the datanode on r2, with
|
|
|
* a repl factor of 2, and,
|
|
|
- * 3) file3, file4 after starting the all three datanodes, with a repl
|
|
|
+ * 3) file3 after starting the all three datanodes, with a repl
|
|
|
* factor of 3.
|
|
|
- * At the end, file1, file5 will be present on only datanode1, file2 will
|
|
|
- * be present on datanode 1 and datanode2 and
|
|
|
- * file3, file4 will be present on all datanodes.
|
|
|
+ * At the end, file1 will be present on only datanode1, file2 will be
|
|
|
+ * present on datanode 1 and datanode2 and
|
|
|
+ * file3 will be present on all datanodes.
|
|
|
*/
|
|
|
JobConf conf = new JobConf();
|
|
|
conf.setBoolean("dfs.replication.considerLoad", false);
|
|
@@ -137,30 +111,6 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
}
|
|
|
Path file1 = new Path(dir1 + "/file1");
|
|
|
writeFile(conf, file1, (short)1, 1);
|
|
|
- // create another file on the same datanode
|
|
|
- Path file5 = new Path(dir5 + "/file5");
|
|
|
- writeFile(conf, file5, (short)1, 1);
|
|
|
- // split it using a CombinedFile input format
|
|
|
- DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
- JobConf job = new JobConf(conf);
|
|
|
- FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
|
|
|
- InputSplit[] splits = inFormat.getSplits(job, 1);
|
|
|
- System.out.println("Made splits(Test0): " + splits.length);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test0): " + split);
|
|
|
- }
|
|
|
- assertEquals(splits.length, 1);
|
|
|
- CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
- assertEquals(file5.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1));
|
|
|
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
-
|
|
|
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
|
|
|
dfs.waitActive();
|
|
|
|
|
@@ -169,14 +119,14 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
writeFile(conf, file2, (short)2, 2);
|
|
|
|
|
|
// split it using a CombinedFile input format
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
+ DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
inFormat.setInputPaths(conf, dir1 + "," + dir2);
|
|
|
inFormat.setMinSplitSizeRack(BLOCKSIZE);
|
|
|
- splits = inFormat.getSplits(conf, 1);
|
|
|
+ InputSplit[] splits = inFormat.getSplits(conf, 1);
|
|
|
System.out.println("Made splits(Test1): " + splits.length);
|
|
|
|
|
|
// make sure that each split has different locations
|
|
|
- fileSplit = null;
|
|
|
+ CombineFileSplit fileSplit = null;
|
|
|
for (int i = 0; i < splits.length; ++i) {
|
|
|
fileSplit = (CombineFileSplit) splits[i];
|
|
|
System.out.println("File split(Test1): " + fileSplit);
|
|
@@ -486,7 +436,7 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
static void writeFile(Configuration conf, Path name,
|
|
|
short replication, int numBlocks) throws IOException {
|
|
|
FileSystem fileSys = FileSystem.get(conf);
|
|
@@ -494,409 +444,12 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
FSDataOutputStream stm = fileSys.create(name, true,
|
|
|
conf.getInt("io.file.buffer.size", 4096),
|
|
|
replication, (long)BLOCKSIZE);
|
|
|
- writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
|
|
|
- }
|
|
|
-
|
|
|
- // Creates the gzip file and return the FileStatus
|
|
|
- static FileStatus writeGzipFile(Configuration conf, Path name,
|
|
|
- short replication, int numBlocks) throws IOException {
|
|
|
- FileSystem fileSys = FileSystem.get(conf);
|
|
|
-
|
|
|
- GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
|
|
|
- .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
|
|
|
- writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
|
|
|
- return fileSys.getFileStatus(name);
|
|
|
- }
|
|
|
-
|
|
|
- private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
|
|
|
- OutputStream out, short replication, int numBlocks) throws IOException {
|
|
|
for (int i = 0; i < numBlocks; i++) {
|
|
|
- out.write(databuf);
|
|
|
+ stm.write(databuf);
|
|
|
}
|
|
|
- out.close();
|
|
|
+ stm.close();
|
|
|
DFSTestUtil.waitReplication(fileSys, name, replication);
|
|
|
}
|
|
|
-
|
|
|
- public void testSplitPlacementForCompressedFiles() throws IOException {
|
|
|
- MiniDFSCluster dfs = null;
|
|
|
- FileSystem fileSys = null;
|
|
|
- try {
|
|
|
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
|
|
|
- * files
|
|
|
- * 1) file1 and file5, just after starting the datanode on r1, with
|
|
|
- * a repl factor of 1, and,
|
|
|
- * 2) file2, just after starting the datanode on r2, with
|
|
|
- * a repl factor of 2, and,
|
|
|
- * 3) file3, file4 after starting the all three datanodes, with a repl
|
|
|
- * factor of 3.
|
|
|
- * At the end, file1, file5 will be present on only datanode1, file2 will
|
|
|
- * be present on datanode 1 and datanode2 and
|
|
|
- * file3, file4 will be present on all datanodes.
|
|
|
- */
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.setBoolean("dfs.replication.considerLoad", false);
|
|
|
- dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
|
|
|
- dfs.waitActive();
|
|
|
-
|
|
|
- fileSys = dfs.getFileSystem();
|
|
|
- if (!fileSys.mkdirs(inDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create " + inDir.toString());
|
|
|
- }
|
|
|
- Path file1 = new Path(dir1 + "/file1.gz");
|
|
|
- FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
|
|
|
- // create another file on the same datanode
|
|
|
- Path file5 = new Path(dir5 + "/file5.gz");
|
|
|
- FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
|
|
|
- // split it using a CombinedFile input format
|
|
|
- DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
- JobConf job = new JobConf(conf);
|
|
|
- FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
|
|
|
- InputSplit[] splits = inFormat.getSplits(job, 1);
|
|
|
- System.out.println("Made splits(Test0): " + splits.length);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test0): " + split);
|
|
|
- }
|
|
|
- assertEquals(splits.length, 1);
|
|
|
- CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(file5.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1));
|
|
|
- assertEquals(f5.getLen(), fileSplit.getLength(1));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
-
|
|
|
- dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
|
|
|
- dfs.waitActive();
|
|
|
-
|
|
|
- // create file on two datanodes.
|
|
|
- Path file2 = new Path(dir2 + "/file2.gz");
|
|
|
- FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
|
|
|
-
|
|
|
- // split it using a CombinedFile input format
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
|
|
|
- inFormat.setMinSplitSizeRack(f1.getLen());
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- System.out.println("Made splits(Test1): " + splits.length);
|
|
|
-
|
|
|
- // make sure that each split has different locations
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test1): " + split);
|
|
|
- }
|
|
|
- assertEquals(2, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
-
|
|
|
- // create another file on 3 datanodes and 3 racks.
|
|
|
- dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
|
|
|
- dfs.waitActive();
|
|
|
- Path file3 = new Path(dir3 + "/file3.gz");
|
|
|
- FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
|
|
|
- inFormat.setMinSplitSizeRack(f1.getLen());
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test2): " + split);
|
|
|
- }
|
|
|
- assertEquals(3, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
- fileSplit = (CombineFileSplit) splits[2];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
-
|
|
|
- // create file4 on all three racks
|
|
|
- Path file4 = new Path(dir4 + "/file4.gz");
|
|
|
- FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- FileInputFormat.setInputPaths(job,
|
|
|
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
- inFormat.setMinSplitSizeRack(f1.getLen());
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test3): " + split);
|
|
|
- }
|
|
|
- assertEquals(3, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1));
|
|
|
- assertEquals(f4.getLen(), fileSplit.getLength(1));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
- fileSplit = (CombineFileSplit) splits[2];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
-
|
|
|
- // maximum split size is file1's length
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- inFormat.setMinSplitSizeNode(f1.getLen());
|
|
|
- inFormat.setMaxSplitSize(f1.getLen());
|
|
|
- FileInputFormat.setInputPaths(job,
|
|
|
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test4): " + split);
|
|
|
- }
|
|
|
- assertEquals(4, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f4.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
- fileSplit = (CombineFileSplit) splits[2];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
- fileSplit = (CombineFileSplit) splits[3];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
-
|
|
|
- // maximum split size is twice file1's length
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- inFormat.setMinSplitSizeNode(f1.getLen());
|
|
|
- inFormat.setMaxSplitSize(2 * f1.getLen());
|
|
|
- FileInputFormat.setInputPaths(job,
|
|
|
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test5): " + split);
|
|
|
- }
|
|
|
- assertEquals(3, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1));
|
|
|
- assertEquals(f4.getLen(), fileSplit.getLength(1));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]);
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
- fileSplit = (CombineFileSplit) splits[2];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
-
|
|
|
- // maximum split size is 4 times file1's length
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- inFormat.setMinSplitSizeNode(2 * f1.getLen());
|
|
|
- inFormat.setMaxSplitSize(4 * f1.getLen());
|
|
|
- FileInputFormat.setInputPaths(job,
|
|
|
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test6): " + split);
|
|
|
- }
|
|
|
- assertEquals(2, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1));
|
|
|
- assertEquals(f4.getLen(), fileSplit.getLength(1));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]);
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(1));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
-
|
|
|
- // maximum split size and min-split-size per rack is 4 times file1's length
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- inFormat.setMaxSplitSize(4 * f1.getLen());
|
|
|
- inFormat.setMinSplitSizeRack(4 * f1.getLen());
|
|
|
- FileInputFormat.setInputPaths(job,
|
|
|
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test7): " + split);
|
|
|
- }
|
|
|
- assertEquals(1, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(4, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
-
|
|
|
- // minimum split size per node is 4 times file1's length
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- inFormat.setMinSplitSizeNode(4 * f1.getLen());
|
|
|
- FileInputFormat.setInputPaths(job,
|
|
|
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test8): " + split);
|
|
|
- }
|
|
|
- assertEquals(1, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(4, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
|
|
|
-
|
|
|
- // Rack 1 has file1, file2 and file3 and file4
|
|
|
- // Rack 2 has file2 and file3 and file4
|
|
|
- // Rack 3 has file3 and file4
|
|
|
- // setup a filter so that only file1 and file2 can be combined
|
|
|
- inFormat = new DummyInputFormat();
|
|
|
- FileInputFormat.addInputPath(job, inDir);
|
|
|
- inFormat.setMinSplitSizeRack(1); // everything is at least rack local
|
|
|
- inFormat.createPool(job, new TestFilter(dir1),
|
|
|
- new TestFilter(dir2));
|
|
|
- splits = inFormat.getSplits(job, 1);
|
|
|
- for (InputSplit split : splits) {
|
|
|
- System.out.println("File split(Test9): " + split);
|
|
|
- }
|
|
|
- assertEquals(3, splits.length);
|
|
|
- fileSplit = (CombineFileSplit) splits[0];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
- fileSplit = (CombineFileSplit) splits[1];
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
- fileSplit = (CombineFileSplit) splits[2];
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
-
|
|
|
- // measure performance when there are multiple pools and
|
|
|
- // many files in each pool.
|
|
|
- int numPools = 100;
|
|
|
- int numFiles = 1000;
|
|
|
- DummyInputFormat1 inFormat1 = new DummyInputFormat1();
|
|
|
- for (int i = 0; i < numFiles; i++) {
|
|
|
- FileInputFormat.setInputPaths(job, file1);
|
|
|
- }
|
|
|
- inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
|
|
|
- final Path dirNoMatch1 = new Path(inDir, "/dirxx");
|
|
|
- final Path dirNoMatch2 = new Path(inDir, "/diryy");
|
|
|
- for (int i = 0; i < numPools; i++) {
|
|
|
- inFormat1.createPool(job, new TestFilter(dirNoMatch1),
|
|
|
- new TestFilter(dirNoMatch2));
|
|
|
- }
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- splits = inFormat1.getSplits(job, 1);
|
|
|
- long end = System.currentTimeMillis();
|
|
|
- System.out.println("Elapsed time for " + numPools + " pools " +
|
|
|
- " and " + numFiles + " files is " +
|
|
|
- ((end - start)) + " milli seconds.");
|
|
|
- } finally {
|
|
|
- if (dfs != null) {
|
|
|
- dfs.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Test when input files are from non-default file systems
|
|
|
- */
|
|
|
- public void testForNonDefaultFileSystem() throws Throwable {
|
|
|
- Configuration conf = new Configuration();
|
|
|
-
|
|
|
- // use a fake file system scheme as default
|
|
|
- conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
|
|
|
-
|
|
|
- // default fs path
|
|
|
- assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
|
|
|
- // add a local file
|
|
|
- Path localPath = new Path("testFile1");
|
|
|
- FileSystem lfs = FileSystem.getLocal(conf);
|
|
|
- FSDataOutputStream dos = lfs.create(localPath);
|
|
|
- dos.writeChars("Local file for CFIF");
|
|
|
- dos.close();
|
|
|
-
|
|
|
- conf.set("mapred.working.dir", "/");
|
|
|
- JobConf job = new JobConf(conf);
|
|
|
-
|
|
|
- FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
|
|
|
- DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
- InputSplit[] splits = inFormat.getSplits(job, 1);
|
|
|
- assertTrue(splits.length > 0);
|
|
|
- for (InputSplit s : splits) {
|
|
|
- CombineFileSplit cfs = (CombineFileSplit)s;
|
|
|
- for (Path p : cfs.getPaths()) {
|
|
|
- assertEquals(p.toUri().getScheme(), "file");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
static class TestFilter implements PathFilter {
|
|
|
private Path p;
|
|
@@ -909,7 +462,7 @@ public class TestCombineFileInputFormat extends TestCase{
|
|
|
// returns true if the specified path matches the prefix stored
|
|
|
// in this TestFilter.
|
|
|
public boolean accept(Path path) {
|
|
|
- if (path.toUri().getPath().indexOf(p.toString()) == 0) {
|
|
|
+ if (path.toString().indexOf(p.toString()) == 0) {
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|