|
@@ -22,13 +22,12 @@ import java.io.IOException;
|
|
|
import java.util.Date;
|
|
|
import java.util.Random;
|
|
|
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.*;
|
|
|
-import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
import org.apache.hadoop.mapred.*;
|
|
|
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
|
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
|
|
+import org.apache.hadoop.util.ToolBase;
|
|
|
|
|
|
/**
|
|
|
* This program uses map/reduce to just run a distributed job where there is
|
|
@@ -61,8 +60,11 @@ import org.apache.hadoop.mapred.lib.NullOutputFormat;
|
|
|
* <value>1099511627776</value>
|
|
|
* </property>
|
|
|
* </configuration></xmp>
|
|
|
+ *
|
|
|
+ * Equivalently, {@link RandomWriter} also supports all the above options
|
|
|
+ * and ones supported by {@link ToolBase} via the command-line.
|
|
|
*/
|
|
|
-public class RandomWriter {
|
|
|
+public class RandomWriter extends ToolBase {
|
|
|
|
|
|
/**
|
|
|
* User counters
|
|
@@ -88,7 +90,7 @@ public class RandomWriter {
|
|
|
InputSplit[] result = new InputSplit[numSplits];
|
|
|
Path outDir = job.getOutputPath();
|
|
|
for(int i=0; i < result.length; ++i) {
|
|
|
- result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job);
|
|
|
+ result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, job);
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
@@ -133,8 +135,6 @@ public class RandomWriter {
|
|
|
}
|
|
|
|
|
|
static class Map extends MapReduceBase implements Mapper {
|
|
|
- private FileSystem fileSys = null;
|
|
|
- private JobConf jobConf = null;
|
|
|
private long numBytesToWrite;
|
|
|
private int minKeySize;
|
|
|
private int keySizeRange;
|
|
@@ -143,7 +143,6 @@ public class RandomWriter {
|
|
|
private Random random = new Random();
|
|
|
private BytesWritable randomKey = new BytesWritable();
|
|
|
private BytesWritable randomValue = new BytesWritable();
|
|
|
- private Path outputDir = null;
|
|
|
|
|
|
private void randomizeBytes(byte[] data, int offset, int length) {
|
|
|
for(int i=offset + length - 1; i >= offset; --i) {
|
|
@@ -158,12 +157,6 @@ public class RandomWriter {
|
|
|
Writable value,
|
|
|
OutputCollector output,
|
|
|
Reporter reporter) throws IOException {
|
|
|
- String filename = ((Text) key).toString();
|
|
|
- SequenceFile.Writer writer =
|
|
|
- SequenceFile.createWriter(fileSys, jobConf,
|
|
|
- new Path(outputDir, filename),
|
|
|
- BytesWritable.class, BytesWritable.class,
|
|
|
- CompressionType.NONE, reporter);
|
|
|
int itemCount = 0;
|
|
|
while (numBytesToWrite > 0) {
|
|
|
int keyLength = minKeySize +
|
|
@@ -174,7 +167,7 @@ public class RandomWriter {
|
|
|
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
|
|
|
randomValue.setSize(valueLength);
|
|
|
randomizeBytes(randomValue.get(), 0, randomValue.getSize());
|
|
|
- writer.append(randomKey, randomValue);
|
|
|
+ output.collect(randomKey, randomValue);
|
|
|
numBytesToWrite -= keyLength + valueLength;
|
|
|
reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
|
|
|
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
|
|
@@ -184,7 +177,6 @@ public class RandomWriter {
|
|
|
}
|
|
|
}
|
|
|
reporter.setStatus("done with " + itemCount + " records.");
|
|
|
- writer.close();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -192,14 +184,6 @@ public class RandomWriter {
|
|
|
* the data.
|
|
|
*/
|
|
|
public void configure(JobConf job) {
|
|
|
- jobConf = job;
|
|
|
- try {
|
|
|
- fileSys = FileSystem.get(job);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new RuntimeException("Can't get default file system", e);
|
|
|
- }
|
|
|
- outputDir = job.getOutputPath();
|
|
|
-
|
|
|
numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
|
|
|
1*1024*1024*1024);
|
|
|
minKeySize = job.getInt("test.randomwrite.min_key", 10);
|
|
@@ -219,18 +203,15 @@ public class RandomWriter {
|
|
|
*
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static void main(String[] args) throws IOException {
|
|
|
+ public int run(String[] args) throws Exception {
|
|
|
if (args.length == 0) {
|
|
|
System.out.println("Usage: writer <out-dir> [<config>]");
|
|
|
- return;
|
|
|
+ return -1;
|
|
|
}
|
|
|
+
|
|
|
Path outDir = new Path(args[0]);
|
|
|
- JobConf job;
|
|
|
- if (args.length >= 2) {
|
|
|
- job = new JobConf(new Path(args[1]));
|
|
|
- } else {
|
|
|
- job = new JobConf();
|
|
|
- }
|
|
|
+ JobConf job = new JobConf(conf);
|
|
|
+
|
|
|
job.setJarByClass(RandomWriter.class);
|
|
|
job.setJobName("random-writer");
|
|
|
job.setOutputPath(outDir);
|
|
@@ -241,7 +222,7 @@ public class RandomWriter {
|
|
|
job.setInputFormat(RandomInputFormat.class);
|
|
|
job.setMapperClass(Map.class);
|
|
|
job.setReducerClass(IdentityReducer.class);
|
|
|
- job.setOutputFormat(NullOutputFormat.class);
|
|
|
+ job.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
|
|
|
JobClient client = new JobClient(job);
|
|
|
ClusterStatus cluster = client.getClusterStatus();
|
|
@@ -250,7 +231,7 @@ public class RandomWriter {
|
|
|
1*1024*1024*1024);
|
|
|
if (numBytesToWritePerMap == 0) {
|
|
|
System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
|
|
|
- System.exit(-1);
|
|
|
+ return -2;
|
|
|
}
|
|
|
long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
|
|
|
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
|
|
@@ -262,7 +243,9 @@ public class RandomWriter {
|
|
|
|
|
|
job.setNumMapTasks(numMaps);
|
|
|
System.out.println("Running " + numMaps + " maps.");
|
|
|
- job.setNumReduceTasks(1);
|
|
|
+
|
|
|
+ // reducer NONE
|
|
|
+ job.setNumReduceTasks(0);
|
|
|
|
|
|
Date startTime = new Date();
|
|
|
System.out.println("Job started: " + startTime);
|
|
@@ -272,6 +255,13 @@ public class RandomWriter {
|
|
|
System.out.println("The job took " +
|
|
|
(endTime.getTime() - startTime.getTime()) /1000 +
|
|
|
" seconds.");
|
|
|
+
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+ int res = new RandomWriter().doMain(new Configuration(), args);
|
|
|
+ System.exit(res);
|
|
|
+ }
|
|
|
+
|
|
|
}
|