|
@@ -20,11 +20,14 @@ package org.apache.hadoop.mapreduce.lib.input;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
import java.net.URI;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Set;
|
|
|
import java.util.zip.GZIPOutputStream;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
+import junit.framework.Assert;
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.hadoop.fs.*;
|
|
@@ -42,9 +45,13 @@ import org.apache.hadoop.mapreduce.RecordReader;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
|
|
|
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
|
|
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.collect.HashMultiset;
|
|
|
+
|
|
|
public class TestCombineFileInputFormat extends TestCase {
|
|
|
|
|
|
private static final String rack1[] = new String[] {
|
|
@@ -476,23 +483,23 @@ public class TestCombineFileInputFormat extends TestCase {
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
|
|
|
fileSplit = (CombineFileSplit) splits.get(1);
|
|
|
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0));
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(1));
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
|
|
|
+ assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
|
|
|
fileSplit = (CombineFileSplit) splits.get(2);
|
|
|
assertEquals(2, fileSplit.getNumPaths());
|
|
|
assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(BLOCKSIZE, fileSplit.getOffset(0));
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(0, fileSplit.getOffset(0));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
|
|
|
assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
|
|
|
+ assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
|
|
|
|
|
|
// maximum split size is 3 blocks
|
|
|
inFormat = new DummyInputFormat();
|
|
@@ -504,7 +511,7 @@ public class TestCombineFileInputFormat extends TestCase {
|
|
|
for (InputSplit split : splits) {
|
|
|
System.out.println("File split(Test5): " + split);
|
|
|
}
|
|
|
- assertEquals(4, splits.size());
|
|
|
+ assertEquals(3, splits.size());
|
|
|
fileSplit = (CombineFileSplit) splits.get(0);
|
|
|
assertEquals(3, fileSplit.getNumPaths());
|
|
|
assertEquals(1, fileSplit.getLocations().length);
|
|
@@ -519,32 +526,28 @@ public class TestCombineFileInputFormat extends TestCase {
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(2));
|
|
|
assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
|
|
|
fileSplit = (CombineFileSplit) splits.get(1);
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
assertEquals(0, fileSplit.getOffset(0));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
assertEquals(file4.getName(), fileSplit.getPath(2).getName());
|
|
|
- assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2));
|
|
|
+ assertEquals(0, fileSplit.getOffset(2));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(2));
|
|
|
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
|
|
|
+ assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
|
|
|
fileSplit = (CombineFileSplit) splits.get(2);
|
|
|
- assertEquals(2, fileSplit.getNumPaths());
|
|
|
+ assertEquals(3, fileSplit.getNumPaths());
|
|
|
assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
assertEquals(0, fileSplit.getOffset(0));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
|
|
|
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
|
|
|
- assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
|
|
|
- fileSplit = (CombineFileSplit) splits.get(3);
|
|
|
- assertEquals(1, fileSplit.getNumPaths());
|
|
|
- assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
- assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(2).getName());
|
|
|
+ assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
|
|
|
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
|
|
|
assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
|
|
|
|
|
|
// maximum split size is 4 blocks
|
|
@@ -713,6 +716,56 @@ public class TestCombineFileInputFormat extends TestCase {
|
|
|
DFSTestUtil.waitReplication(fileSys, name, replication);
|
|
|
}
|
|
|
|
|
|
+ public void testNodeInputSplit() throws IOException, InterruptedException {
|
|
|
+ // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on
|
|
|
+ // both nodes. The grouping ensures that both nodes get splits instead of
|
|
|
+ // just the first node
|
|
|
+ DummyInputFormat inFormat = new DummyInputFormat();
|
|
|
+ int numBlocks = 12;
|
|
|
+ long totLength = 0;
|
|
|
+ long blockSize = 100;
|
|
|
+ long maxSize = 200;
|
|
|
+ long minSizeNode = 50;
|
|
|
+ long minSizeRack = 50;
|
|
|
+ String[] locations = { "h1", "h2" };
|
|
|
+ String[] racks = new String[0];
|
|
|
+ Path path = new Path("hdfs://file");
|
|
|
+
|
|
|
+ OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
|
|
|
+ for(int i=0; i<numBlocks; ++i) {
|
|
|
+ blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
|
|
|
+ totLength += blockSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<InputSplit> splits = new ArrayList<InputSplit>();
|
|
|
+ HashMap<String, Set<String>> rackToNodes =
|
|
|
+ new HashMap<String, Set<String>>();
|
|
|
+ HashMap<String, List<OneBlockInfo>> rackToBlocks =
|
|
|
+ new HashMap<String, List<OneBlockInfo>>();
|
|
|
+ HashMap<OneBlockInfo, String[]> blockToNodes =
|
|
|
+ new HashMap<OneBlockInfo, String[]>();
|
|
|
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks =
|
|
|
+ new HashMap<String, List<OneBlockInfo>>();
|
|
|
+
|
|
|
+ OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
|
|
|
+ nodeToBlocks, rackToNodes);
|
|
|
+
|
|
|
+ inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
|
|
|
+ maxSize, minSizeNode, minSizeRack, splits);
|
|
|
+
|
|
|
+ int expectedSplitCount = (int)(totLength/maxSize);
|
|
|
+ Assert.assertEquals(expectedSplitCount, splits.size());
|
|
|
+ HashMultiset<String> nodeSplits = HashMultiset.create();
|
|
|
+ for(int i=0; i<expectedSplitCount; ++i) {
|
|
|
+ InputSplit inSplit = splits.get(i);
|
|
|
+ Assert.assertEquals(maxSize, inSplit.getLength());
|
|
|
+ Assert.assertEquals(1, inSplit.getLocations().length);
|
|
|
+ nodeSplits.add(inSplit.getLocations()[0]);
|
|
|
+ }
|
|
|
+ Assert.assertEquals(3, nodeSplits.count(locations[0]));
|
|
|
+ Assert.assertEquals(3, nodeSplits.count(locations[1]));
|
|
|
+ }
|
|
|
+
|
|
|
public void testSplitPlacementForCompressedFiles() throws Exception {
|
|
|
MiniDFSCluster dfs = null;
|
|
|
FileSystem fileSys = null;
|
|
@@ -889,24 +942,24 @@ public class TestCombineFileInputFormat extends TestCase {
|
|
|
assertEquals(f3.getLen(), fileSplit.getLength(0));
|
|
|
assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
fileSplit = (CombineFileSplit) splits.get(1);
|
|
|
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f4.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
|
|
|
fileSplit = (CombineFileSplit) splits.get(2);
|
|
|
assertEquals(1, fileSplit.getNumPaths());
|
|
|
assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f2.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
|
|
|
fileSplit = (CombineFileSplit) splits.get(3);
|
|
|
assertEquals(1, fileSplit.getNumPaths());
|
|
|
assertEquals(1, fileSplit.getLocations().length);
|
|
|
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
|
|
|
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
|
|
|
assertEquals(0, fileSplit.getOffset(0));
|
|
|
- assertEquals(f1.getLen(), fileSplit.getLength(0));
|
|
|
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
+ assertEquals(f4.getLen(), fileSplit.getLength(0));
|
|
|
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
|
|
|
|
|
|
// maximum split size is twice file1's length
|
|
|
inFormat = new DummyInputFormat();
|