|
@@ -17,9 +17,12 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
+import java.io.BufferedReader;
|
|
|
import java.io.BufferedWriter;
|
|
|
+import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.io.OutputStreamWriter;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.Iterator;
|
|
@@ -27,6 +30,8 @@ import java.util.Random;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
@@ -107,7 +112,7 @@ public class TestMapRed extends TestCase {
|
|
|
OutputCollector<IntWritable, IntWritable> out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
int randomVal = key.get();
|
|
|
- int randomCount = key.get();
|
|
|
+ int randomCount = val.get();
|
|
|
|
|
|
for (int i = 0; i < randomCount; i++) {
|
|
|
out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
|
|
@@ -119,17 +124,16 @@ public class TestMapRed extends TestCase {
|
|
|
/**
|
|
|
*/
|
|
|
static class RandomGenReducer
|
|
|
- implements Reducer<IntWritable, IntWritable, Text, Text> {
|
|
|
+ implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
|
|
|
|
|
|
public void configure(JobConf job) {
|
|
|
}
|
|
|
|
|
|
public void reduce(IntWritable key, Iterator<IntWritable> it,
|
|
|
- OutputCollector<Text, Text> out,
|
|
|
+ OutputCollector<IntWritable, IntWritable> out,
|
|
|
Reporter reporter) throws IOException {
|
|
|
while (it.hasNext()) {
|
|
|
- int val = it.next().get();
|
|
|
- out.collect(new Text("" + val), new Text(""));
|
|
|
+ out.collect(it.next(), null);
|
|
|
}
|
|
|
}
|
|
|
public void close() {
|
|
@@ -508,6 +512,7 @@ public class TestMapRed extends TestCase {
|
|
|
} finally {
|
|
|
out.close();
|
|
|
}
|
|
|
+ //printFiles(randomIns, conf);
|
|
|
|
|
|
//
|
|
|
// Now we need to generate the random numbers according to
|
|
@@ -544,6 +549,7 @@ public class TestMapRed extends TestCase {
|
|
|
genJob.setNumReduceTasks(1);
|
|
|
|
|
|
JobClient.runJob(genJob);
|
|
|
+ //printFiles(randomOuts, conf);
|
|
|
|
|
|
//
|
|
|
// Next, we read the big file in and regenerate the
|
|
@@ -589,6 +595,7 @@ public class TestMapRed extends TestCase {
|
|
|
checkJob.setNumReduceTasks(intermediateReduces);
|
|
|
|
|
|
JobClient.runJob(checkJob);
|
|
|
+ //printFiles(intermediateOuts, conf);
|
|
|
|
|
|
//
|
|
|
// OK, now we take the output from the last job and
|
|
@@ -597,7 +604,7 @@ public class TestMapRed extends TestCase {
|
|
|
// But by having a single reduce task here, we end up merging
|
|
|
// all the files.
|
|
|
//
|
|
|
- Path finalOuts = new Path(testdir, "finalouts");
|
|
|
+ Path finalOuts = new Path(testdir, "finalouts");
|
|
|
fs.delete(finalOuts, true);
|
|
|
JobConf mergeJob = new JobConf(conf, TestMapRed.class);
|
|
|
FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
|
|
@@ -612,7 +619,7 @@ public class TestMapRed extends TestCase {
|
|
|
mergeJob.setNumReduceTasks(1);
|
|
|
|
|
|
JobClient.runJob(mergeJob);
|
|
|
-
|
|
|
+ //printFiles(finalOuts, conf);
|
|
|
|
|
|
//
|
|
|
// Finally, we compare the reconstructed answer key with the
|
|
@@ -636,7 +643,8 @@ public class TestMapRed extends TestCase {
|
|
|
break;
|
|
|
} else {
|
|
|
if (!((key.get() == i) && (val.get() == dist[i]))) {
|
|
|
- System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
|
|
|
+ System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i +
|
|
|
+ ", val=" + val.get() + ", dist[i]=" + dist[i]);
|
|
|
success = false;
|
|
|
}
|
|
|
totalseen += val.get();
|
|
@@ -665,13 +673,65 @@ public class TestMapRed extends TestCase {
|
|
|
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
|
|
|
try {
|
|
|
bw.write("Success=" + success + "\n");
|
|
|
- System.out.println("Success=" + success);
|
|
|
+ System.out.println("Success=" + success);
|
|
|
} finally {
|
|
|
bw.close();
|
|
|
}
|
|
|
+ assertTrue("testMapRed failed", success);
|
|
|
fs.delete(testdir, true);
|
|
|
}
|
|
|
|
|
|
+ private static void printTextFile(FileSystem fs, Path p) throws IOException {
|
|
|
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p)));
|
|
|
+ String line;
|
|
|
+ while ((line = in.readLine()) != null) {
|
|
|
+ System.out.println(" Row: " + line);
|
|
|
+ }
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void printSequenceFile(FileSystem fs, Path p,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
|
|
|
+ Object key = null;
|
|
|
+ Object value = null;
|
|
|
+ while ((key = r.next(key)) != null) {
|
|
|
+ value = r.getCurrentValue(value);
|
|
|
+ System.out.println(" Row: " + key + ", " + value);
|
|
|
+ }
|
|
|
+ r.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean isSequenceFile(FileSystem fs,
|
|
|
+ Path f) throws IOException {
|
|
|
+ DataInputStream in = fs.open(f);
|
|
|
+ byte[] seq = "SEQ".getBytes();
|
|
|
+ for(int i=0; i < seq.length; ++i) {
|
|
|
+ if (seq[i] != in.read()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void printFiles(Path dir,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
+ FileSystem fs = dir.getFileSystem(conf);
|
|
|
+ for(FileStatus f: fs.listStatus(dir)) {
|
|
|
+ System.out.println("Reading " + f.getPath() + ": ");
|
|
|
+ if (f.isDir()) {
|
|
|
+ System.out.println(" it is a map file.");
|
|
|
+ printSequenceFile(fs, new Path(f.getPath(), "data"), conf);
|
|
|
+ } else if (isSequenceFile(fs, f.getPath())) {
|
|
|
+ System.out.println(" it is a sequence file.");
|
|
|
+ printSequenceFile(fs, f.getPath(), conf);
|
|
|
+ } else {
|
|
|
+ System.out.println(" it is a text file.");
|
|
|
+ printTextFile(fs, f.getPath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Launches all the tasks in order.
|
|
|
*/
|