|
@@ -33,8 +33,6 @@ import org.apache.hadoop.tools.CopyListing;
|
|
import org.apache.hadoop.tools.DistCpOptions;
|
|
import org.apache.hadoop.tools.DistCpOptions;
|
|
import org.apache.hadoop.tools.StubContext;
|
|
import org.apache.hadoop.tools.StubContext;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.BeforeClass;
|
|
import org.junit.BeforeClass;
|
|
@@ -48,9 +46,6 @@ import java.util.Random;
|
|
|
|
|
|
|
|
|
|
public class TestUniformSizeInputFormat {
|
|
public class TestUniformSizeInputFormat {
|
|
- private static final Log LOG
|
|
|
|
- = LogFactory.getLog(TestUniformSizeInputFormat.class);
|
|
|
|
-
|
|
|
|
private static MiniDFSCluster cluster;
|
|
private static MiniDFSCluster cluster;
|
|
private static final int N_FILES = 20;
|
|
private static final int N_FILES = 20;
|
|
private static final int SIZEOF_EACH_FILE=1024;
|
|
private static final int SIZEOF_EACH_FILE=1024;
|
|
@@ -118,12 +113,9 @@ public class TestUniformSizeInputFormat {
|
|
List<InputSplit> splits
|
|
List<InputSplit> splits
|
|
= uniformSizeInputFormat.getSplits(jobContext);
|
|
= uniformSizeInputFormat.getSplits(jobContext);
|
|
|
|
|
|
- List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
|
|
|
|
-
|
|
|
|
int sizePerMap = totalFileSize/nMaps;
|
|
int sizePerMap = totalFileSize/nMaps;
|
|
|
|
|
|
checkSplits(listFile, splits);
|
|
checkSplits(listFile, splits);
|
|
- checkAgainstLegacy(splits, legacySplits);
|
|
|
|
|
|
|
|
int doubleCheckedTotalSize = 0;
|
|
int doubleCheckedTotalSize = 0;
|
|
int previousSplitSize = -1;
|
|
int previousSplitSize = -1;
|
|
@@ -155,57 +147,6 @@ public class TestUniformSizeInputFormat {
|
|
Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
|
|
Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
|
|
}
|
|
}
|
|
|
|
|
|
- // From
|
|
|
|
- // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
|
|
|
|
- private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
|
|
|
|
- throws IOException {
|
|
|
|
-
|
|
|
|
- FileSystem fs = cluster.getFileSystem();
|
|
|
|
- FileStatus srcst = fs.getFileStatus(listFile);
|
|
|
|
- Configuration conf = fs.getConf();
|
|
|
|
-
|
|
|
|
- ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
|
|
|
|
- FileStatus value = new FileStatus();
|
|
|
|
- Text key = new Text();
|
|
|
|
- final long targetsize = totalFileSize / numSplits;
|
|
|
|
- long pos = 0L;
|
|
|
|
- long last = 0L;
|
|
|
|
- long acc = 0L;
|
|
|
|
- long cbrem = srcst.getLen();
|
|
|
|
- SequenceFile.Reader sl = null;
|
|
|
|
-
|
|
|
|
- LOG.info("Average bytes per map: " + targetsize +
|
|
|
|
- ", Number of maps: " + numSplits + ", total size: " + totalFileSize);
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- sl = new SequenceFile.Reader(conf, SequenceFile.Reader.file(listFile));
|
|
|
|
- for (; sl.next(key, value); last = sl.getPosition()) {
|
|
|
|
- // if adding this split would put this split past the target size,
|
|
|
|
- // cut the last split and put this next file in the next split.
|
|
|
|
- if (acc + value.getLen() > targetsize && acc != 0) {
|
|
|
|
- long splitsize = last - pos;
|
|
|
|
- FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
|
|
|
|
- LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
|
|
|
|
- splits.add(fileSplit);
|
|
|
|
- cbrem -= splitsize;
|
|
|
|
- pos = last;
|
|
|
|
- acc = 0L;
|
|
|
|
- }
|
|
|
|
- acc += value.getLen();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- finally {
|
|
|
|
- IOUtils.closeStream(sl);
|
|
|
|
- }
|
|
|
|
- if (cbrem != 0) {
|
|
|
|
- FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
|
|
|
|
- LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
|
|
|
|
- splits.add(fileSplit);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return splits;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
|
|
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
|
|
long lastEnd = 0;
|
|
long lastEnd = 0;
|
|
|
|
|
|
@@ -233,18 +174,6 @@ public class TestUniformSizeInputFormat {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void checkAgainstLegacy(List<InputSplit> splits,
|
|
|
|
- List<InputSplit> legacySplits)
|
|
|
|
- throws IOException, InterruptedException {
|
|
|
|
-
|
|
|
|
- Assert.assertEquals(legacySplits.size(), splits.size());
|
|
|
|
- for (int index = 0; index < splits.size(); index++) {
|
|
|
|
- FileSplit fileSplit = (FileSplit) splits.get(index);
|
|
|
|
- FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
|
|
|
|
- Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Test
|
|
@Test
|
|
public void testGetSplits() throws Exception {
|
|
public void testGetSplits() throws Exception {
|
|
testGetSplits(9);
|
|
testGetSplits(9);
|