|
@@ -17,28 +17,38 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapreduce.task.reduce;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.net.URISyntaxException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Random;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
|
|
|
+import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.MROutputFiles;
|
|
|
import org.apache.hadoop.mapred.MapOutputFile;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.internal.util.reflection.Whitebox;
|
|
|
|
|
|
public class TestMergeManager {
|
|
|
|
|
|
@Test(timeout=10000)
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
public void testMemoryMerge() throws Exception {
|
|
|
final int TOTAL_MEM_BYTES = 10000;
|
|
|
final int OUTPUT_SIZE = 7950;
|
|
@@ -195,4 +205,59 @@ public class TestMergeManager {
|
|
|
return exceptions.size();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @SuppressWarnings({ "unchecked", "deprecation" })
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testOnDiskMerger() throws IOException, URISyntaxException,
|
|
|
+ InterruptedException {
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
+ final int SORT_FACTOR = 5;
|
|
|
+ jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
|
|
|
+
|
|
|
+ MapOutputFile mapOutputFile = new MROutputFiles();
|
|
|
+ FileSystem fs = FileSystem.getLocal(jobConf);
|
|
|
+ MergeManagerImpl<IntWritable, IntWritable> manager =
|
|
|
+ new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
|
|
|
+ , null, null, null, null, null, null, null, null, null, mapOutputFile);
|
|
|
+
|
|
|
+ MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
|
|
|
+ onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
|
|
|
+ IntWritable, IntWritable>) Whitebox.getInternalState(manager,
|
|
|
+ "onDiskMerger");
|
|
|
+ int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
|
|
|
+ "mergeFactor");
|
|
|
+
|
|
|
+ // make sure the io.sort.factor is set properly
|
|
|
+ assertEquals(mergeFactor, SORT_FACTOR);
|
|
|
+
|
|
|
+ // Stop the onDiskMerger thread so that we can intercept the list of files
|
|
|
+ // waiting to be merged.
|
|
|
+ onDiskMerger.suspend();
|
|
|
+
|
|
|
+ //Send the list of fake files waiting to be merged
|
|
|
+ Random rand = new Random();
|
|
|
+ for(int i = 0; i < 2*SORT_FACTOR; ++i) {
|
|
|
+ Path path = new Path("somePath");
|
|
|
+ CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
|
|
|
+ manager.closeOnDiskFile(cap);
|
|
|
+ }
|
|
|
+
|
|
|
+ //Check that the files pending to be merged are in sorted order.
|
|
|
+ LinkedList<List<CompressAwarePath>> pendingToBeMerged =
|
|
|
+ (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
|
|
|
+ onDiskMerger, "pendingToBeMerged");
|
|
|
+ assertTrue("No inputs were added to list pending to merge",
|
|
|
+ pendingToBeMerged.size() > 0);
|
|
|
+ for(int i = 0; i < pendingToBeMerged.size(); ++i) {
|
|
|
+ List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
|
|
|
+ for(int j = 1; j < inputs.size(); ++j) {
|
|
|
+ assertTrue("Not enough / too many inputs were going to be merged",
|
|
|
+ inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
|
|
|
+ assertTrue("Inputs to be merged were not sorted according to size: ",
|
|
|
+ inputs.get(j).getCompressedSize()
|
|
|
+ >= inputs.get(j-1).getCompressedSize());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
}
|