|
@@ -0,0 +1,214 @@
|
|
|
+/**
|
|
|
+ * Copyright 2006 The Apache Software Foundation
|
|
|
+ *
|
|
|
+ * Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
+ * you may not use this file except in compliance with the License.
|
|
|
+ * You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.examples;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Random;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.io.BytesWritable;
|
|
|
+import org.apache.hadoop.io.IntWritable;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
|
+import org.apache.hadoop.mapred.ClusterStatus;
|
|
|
+import org.apache.hadoop.mapred.JobClient;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.Mapper;
|
|
|
+import org.apache.hadoop.mapred.MapReduceBase;
|
|
|
+import org.apache.hadoop.mapred.OutputCollector;
|
|
|
+import org.apache.hadoop.mapred.Reducer;
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
|
+
|
|
|
+/**
|
|
|
+ * This program uses map/reduce to just run a distributed job where there is
|
|
|
+ * no interaction between the tasks and each task creates 1M/NTasks files
|
|
|
+ * of 8 bytes each, closes them. Opens those files again, and reads them,
|
|
|
+ * and closes them. It is meant as a stress-test and benchmark for namenode.
|
|
|
+ *
|
|
|
+ * @author Owen O'Malley
|
|
|
+ */
|
|
|
+public class NNBench extends MapReduceBase implements Reducer {
|
|
|
+
|
|
|
+ public static class Map extends MapReduceBase implements Mapper {
|
|
|
+ private FileSystem fileSys = null;
|
|
|
+ private int numBytesToWrite;
|
|
|
+ private Random random = new Random();
|
|
|
+ private String taskId = null;
|
|
|
+ private Path topDir = null;
|
|
|
+
|
|
|
+ private void randomizeBytes(byte[] data, int offset, int length) {
|
|
|
+ for(int i=offset + length - 1; i >= offset; --i) {
|
|
|
+ data[i] = (byte) random.nextInt(256);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Given a number of files to create, create and open those files.
|
|
|
+ */
|
|
|
+ public void map(WritableComparable key,
|
|
|
+ Writable value,
|
|
|
+ OutputCollector output,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
+ int nFiles = ((IntWritable) value).get();
|
|
|
+ Path taskDir = new Path(topDir, taskId);
|
|
|
+ fileSys.mkdirs(taskDir);
|
|
|
+ byte[] buffer = new byte[32768];
|
|
|
+ for (int index = 0; index < nFiles; index++) {
|
|
|
+ FSDataOutputStream out = fileSys.create(
|
|
|
+ new Path(taskDir, Integer.toString(index)));
|
|
|
+ int toBeWritten = numBytesToWrite;
|
|
|
+ while (toBeWritten > 0) {
|
|
|
+ int nbytes = Math.min(buffer.length, toBeWritten);
|
|
|
+ randomizeBytes(buffer, 0, nbytes);
|
|
|
+ toBeWritten -= nbytes;
|
|
|
+ out.write(buffer, 0, nbytes);
|
|
|
+ reporter.setStatus("wrote " + (numBytesToWrite-toBeWritten) +
|
|
|
+ " bytes for "+ index +"th file.");
|
|
|
+ }
|
|
|
+ out.close();
|
|
|
+ }
|
|
|
+ for (int index = 0; index < nFiles; index++) {
|
|
|
+ FSDataInputStream in = fileSys.open(
|
|
|
+ new Path(taskDir, Integer.toString(index)));
|
|
|
+ int toBeRead = numBytesToWrite;
|
|
|
+ while (toBeRead > 0) {
|
|
|
+ int nbytes = Math.min(buffer.length, toBeRead);
|
|
|
+ randomizeBytes(buffer, 0, nbytes);
|
|
|
+ toBeRead -= nbytes;
|
|
|
+ in.read(buffer, 0, nbytes);
|
|
|
+ reporter.setStatus("read " + (numBytesToWrite-toBeRead) +
|
|
|
+ " bytes for "+ index +"th file.");
|
|
|
+ }
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ fileSys.delete(taskDir); // clean up after yourself
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Save the values out of the configuaration that we need to write
|
|
|
+ * the data.
|
|
|
+ */
|
|
|
+ public void configure(JobConf job) {
|
|
|
+ try {
|
|
|
+ fileSys = FileSystem.get(job);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException("Can't get default file system", e);
|
|
|
+ }
|
|
|
+ numBytesToWrite = job.getInt("test.nnbench.bytes_per_file", 0);
|
|
|
+ topDir = new Path(job.get("test.nnbench.topdir", "/nnbench"));
|
|
|
+ taskId = job.get("mapred.task.id", (new Long(random.nextLong())).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void reduce(WritableComparable key,
|
|
|
+ Iterator values,
|
|
|
+ OutputCollector output,
|
|
|
+ Reporter reporter) throws IOException {
|
|
|
+ // nothing
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This is the main routine for launching a distributed namenode stress test.
|
|
|
+ * It runs 10 maps/node and each node creates 1M/nMaps DFS files.
|
|
|
+ * The reduce doesn't do anything.
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static void main(String[] args) throws IOException {
|
|
|
+ Configuration defaults = new Configuration();
|
|
|
+ if (args.length != 3) {
|
|
|
+ System.out.println("Usage: nnbench <out-dir> <filesPerMap> <bytesPerFile>");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Path outDir = new Path(args[0]);
|
|
|
+ int filesPerMap = Integer.parseInt(args[1]);
|
|
|
+ int numBytesPerFile = Integer.parseInt(args[2]);
|
|
|
+
|
|
|
+ JobConf jobConf = new JobConf(defaults, NNBench.class);
|
|
|
+ jobConf.setJobName("nnbench");
|
|
|
+ jobConf.setInt("test.nnbench.bytes_per_file", numBytesPerFile);
|
|
|
+ jobConf.set("test.nnbench.topdir", args[0]);
|
|
|
+
|
|
|
+ // turn off speculative execution, because DFS doesn't handle
|
|
|
+ // multiple writers to the same file.
|
|
|
+ jobConf.setSpeculativeExecution(false);
|
|
|
+ jobConf.setInputKeyClass(IntWritable.class);
|
|
|
+ jobConf.setInputValueClass(IntWritable.class);
|
|
|
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
|
|
|
+ jobConf.setOutputKeyClass(BytesWritable.class);
|
|
|
+ jobConf.setOutputValueClass(BytesWritable.class);
|
|
|
+
|
|
|
+ jobConf.setMapperClass(Map.class);
|
|
|
+ jobConf.setReducerClass(NNBench.class);
|
|
|
+
|
|
|
+ JobClient client = new JobClient(jobConf);
|
|
|
+ ClusterStatus cluster = client.getClusterStatus();
|
|
|
+ int numMaps = cluster.getTaskTrackers() *
|
|
|
+ jobConf.getInt("test.nnbench.maps_per_host", 10);
|
|
|
+ jobConf.setNumMapTasks(numMaps);
|
|
|
+ System.out.println("Running " + numMaps + " maps.");
|
|
|
+ jobConf.setNumReduceTasks(1);
|
|
|
+
|
|
|
+ Path tmpDir = new Path("random-work");
|
|
|
+ Path inDir = new Path(tmpDir, "in");
|
|
|
+ Path fakeOutDir = new Path(tmpDir, "out");
|
|
|
+ FileSystem fileSys = FileSystem.get(jobConf);
|
|
|
+ if (fileSys.exists(outDir)) {
|
|
|
+ System.out.println("Error: Output directory " + outDir +
|
|
|
+ " already exists.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ fileSys.delete(tmpDir);
|
|
|
+ fileSys.mkdirs(inDir);
|
|
|
+
|
|
|
+ for(int i=0; i < numMaps; ++i) {
|
|
|
+ Path file = new Path(inDir, "part"+i);
|
|
|
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys,
|
|
|
+ file,
|
|
|
+ IntWritable.class, IntWritable.class,
|
|
|
+ null);
|
|
|
+ writer.append(new IntWritable(0), new IntWritable(filesPerMap));
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ jobConf.setInputPath(inDir);
|
|
|
+ jobConf.setOutputPath(fakeOutDir);
|
|
|
+
|
|
|
+ // Uncomment to run locally in a single process
|
|
|
+ //job_conf.set("mapred.job.tracker", "local");
|
|
|
+
|
|
|
+ Date startTime = new Date();
|
|
|
+ System.out.println("Job started: " + startTime);
|
|
|
+ try {
|
|
|
+ JobClient.runJob(jobConf);
|
|
|
+ Date endTime = new Date();
|
|
|
+ System.out.println("Job ended: " + endTime);
|
|
|
+ System.out.println("The job took " +
|
|
|
+ (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
|
|
|
+ } finally {
|
|
|
+ fileSys.delete(tmpDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|