|
@@ -15,36 +15,156 @@
|
|
* See the License for the specific language governing permissions and
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
* limitations under the License.
|
|
*/
|
|
*/
|
|
-package org.apache.hadoop.mapred;
|
|
|
|
|
|
+package org.apache.hadoop.mapreduce.task.reduce;
|
|
|
|
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.doAnswer;
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Arrays;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.TreeMap;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
+import org.apache.hadoop.fs.LocalDirAllocator;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
import org.apache.hadoop.io.RawComparator;
|
|
import org.apache.hadoop.io.RawComparator;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.mapred.Counters.Counter;
|
|
import org.apache.hadoop.mapred.Counters.Counter;
|
|
import org.apache.hadoop.mapred.IFile.Reader;
|
|
import org.apache.hadoop.mapred.IFile.Reader;
|
|
|
|
+import org.apache.hadoop.mapred.IFile;
|
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
|
+import org.apache.hadoop.mapred.MROutputFiles;
|
|
|
|
+import org.apache.hadoop.mapred.Merger;
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
|
|
+import org.apache.hadoop.mapred.RawKeyValueIterator;
|
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
|
+import org.apache.hadoop.mapreduce.JobID;
|
|
|
|
+import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
|
+import org.apache.hadoop.mapreduce.TaskID;
|
|
|
|
+import org.apache.hadoop.mapreduce.TaskType;
|
|
|
|
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
|
|
import org.apache.hadoop.util.Progress;
|
|
import org.apache.hadoop.util.Progress;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
+import org.junit.After;
|
|
|
|
+import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
import org.mockito.stubbing.Answer;
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
|
public class TestMerger {
|
|
public class TestMerger {
|
|
|
|
|
|
|
|
+ private Configuration conf;
|
|
|
|
+ private JobConf jobConf;
|
|
|
|
+ private FileSystem fs;
|
|
|
|
+
|
|
|
|
+ @Before
|
|
|
|
+ public void setup() throws IOException {
|
|
|
|
+ conf = new Configuration();
|
|
|
|
+ jobConf = new JobConf();
|
|
|
|
+ fs = FileSystem.getLocal(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @After
|
|
|
|
+ public void cleanup() throws IOException {
|
|
|
|
+ fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testInMemoryMerger() throws IOException {
|
|
|
|
+ JobID jobId = new JobID("a", 0);
|
|
|
|
+ TaskAttemptID reduceId = new TaskAttemptID(
|
|
|
|
+ new TaskID(jobId, TaskType.REDUCE, 0), 0);
|
|
|
|
+ TaskAttemptID mapId1 = new TaskAttemptID(
|
|
|
|
+ new TaskID(jobId, TaskType.MAP, 1), 0);
|
|
|
|
+ TaskAttemptID mapId2 = new TaskAttemptID(
|
|
|
|
+ new TaskID(jobId, TaskType.MAP, 2), 0);
|
|
|
|
+
|
|
|
|
+ LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
|
|
|
+
|
|
|
|
+ MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
|
|
|
|
+ reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
|
|
|
|
+ null, null, new Progress(), new MROutputFiles());
|
|
|
|
+
|
|
|
|
+ // write map outputs
|
|
|
|
+ Map<String, String> map1 = new TreeMap<String, String>();
|
|
|
|
+ map1.put("apple", "disgusting");
|
|
|
|
+ map1.put("carrot", "delicious");
|
|
|
|
+ Map<String, String> map2 = new TreeMap<String, String>();
|
|
|
|
+ map1.put("banana", "pretty good");
|
|
|
|
+ byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
|
|
|
|
+ byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
|
|
|
|
+ InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
|
|
|
|
+ conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
|
|
|
|
+ InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
|
|
|
|
+ conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
|
|
|
|
+ System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
|
|
|
|
+ mapOutputBytes1.length);
|
|
|
|
+ System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
|
|
|
|
+ mapOutputBytes2.length);
|
|
|
|
+
|
|
|
|
+ // create merger and run merge
|
|
|
|
+ MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
|
|
|
|
+ mergeManager.createInMemoryMerger();
|
|
|
|
+ List<InMemoryMapOutput<Text, Text>> mapOutputs =
|
|
|
|
+ new ArrayList<InMemoryMapOutput<Text, Text>>();
|
|
|
|
+ mapOutputs.add(mapOutput1);
|
|
|
|
+ mapOutputs.add(mapOutput2);
|
|
|
|
+
|
|
|
|
+ inMemoryMerger.merge(mapOutputs);
|
|
|
|
+
|
|
|
|
+ Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
|
|
|
|
+ Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
|
|
|
|
+
|
|
|
|
+ List<String> keys = new ArrayList<String>();
|
|
|
|
+ List<String> values = new ArrayList<String>();
|
|
|
|
+ readOnDiskMapOutput(conf, fs, outPath, keys, values);
|
|
|
|
+ Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
|
|
|
|
+ Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
|
|
|
|
+ throws IOException {
|
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
|
+ FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
|
|
|
|
+ IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
|
|
|
|
+ Text.class, Text.class, null, null);
|
|
|
|
+ for (String key : keysToValues.keySet()) {
|
|
|
|
+ String value = keysToValues.get(key);
|
|
|
|
+ writer.append(new Text(key), new Text(value));
|
|
|
|
+ }
|
|
|
|
+ writer.close();
|
|
|
|
+ return baos.toByteArray();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
|
|
|
|
+ List<String> keys, List<String> values) throws IOException {
|
|
|
|
+ IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
|
|
|
|
+ path, null, null);
|
|
|
|
+ DataInputBuffer keyBuff = new DataInputBuffer();
|
|
|
|
+ DataInputBuffer valueBuff = new DataInputBuffer();
|
|
|
|
+ Text key = new Text();
|
|
|
|
+ Text value = new Text();
|
|
|
|
+ while (reader.nextRawKey(keyBuff)) {
|
|
|
|
+ key.readFields(keyBuff);
|
|
|
|
+ keys.add(key.toString());
|
|
|
|
+ reader.nextRawValue(valueBuff);
|
|
|
|
+ value.readFields(valueBuff);
|
|
|
|
+ values.add(value.toString());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testCompressed() throws IOException {
|
|
public void testCompressed() throws IOException {
|
|
testMergeShouldReturnProperProgress(getCompressedSegments());
|
|
testMergeShouldReturnProperProgress(getCompressedSegments());
|
|
@@ -58,9 +178,6 @@ public class TestMerger {
|
|
@SuppressWarnings( { "deprecation", "unchecked" })
|
|
@SuppressWarnings( { "deprecation", "unchecked" })
|
|
public void testMergeShouldReturnProperProgress(
|
|
public void testMergeShouldReturnProperProgress(
|
|
List<Segment<Text, Text>> segments) throws IOException {
|
|
List<Segment<Text, Text>> segments) throws IOException {
|
|
- Configuration conf = new Configuration();
|
|
|
|
- JobConf jobConf = new JobConf();
|
|
|
|
- FileSystem fs = FileSystem.getLocal(conf);
|
|
|
|
Path tmpDir = new Path("localpath");
|
|
Path tmpDir = new Path("localpath");
|
|
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
|
|
Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
|
|
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
|
|
Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
|
|
@@ -87,7 +204,6 @@ public class TestMerger {
|
|
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
|
|
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
|
|
for (int i = 1; i < 1; i++) {
|
|
for (int i = 1; i < 1; i++) {
|
|
segments.add(getUncompressedSegment(i));
|
|
segments.add(getUncompressedSegment(i));
|
|
- System.out.println("adding segment");
|
|
|
|
}
|
|
}
|
|
return segments;
|
|
return segments;
|
|
}
|
|
}
|
|
@@ -96,7 +212,6 @@ public class TestMerger {
|
|
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
|
|
List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
|
|
for (int i = 1; i < 1; i++) {
|
|
for (int i = 1; i < 1; i++) {
|
|
segments.add(getCompressedSegment(i));
|
|
segments.add(getCompressedSegment(i));
|
|
- System.out.println("adding segment");
|
|
|
|
}
|
|
}
|
|
return segments;
|
|
return segments;
|
|
}
|
|
}
|
|
@@ -133,7 +248,7 @@ public class TestMerger {
|
|
if (i++ == 2) {
|
|
if (i++ == 2) {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
- key.reset(("Segement Key " + segmentName + i).getBytes(), 20);
|
|
|
|
|
|
+ key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
};
|
|
};
|
|
@@ -149,7 +264,7 @@ public class TestMerger {
|
|
if (i++ == 2) {
|
|
if (i++ == 2) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
- key.reset(("Segement Value " + segmentName + i).getBytes(), 20);
|
|
|
|
|
|
+ key.reset(("Segment Value " + segmentName + i).getBytes(), 20);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
};
|
|
};
|