|
@@ -18,19 +18,30 @@
|
|
|
|
|
|
package org.apache.hadoop.fs;
|
|
|
|
|
|
-import java.io.*;
|
|
|
-
|
|
|
-import junit.framework.TestCase;
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.io.PrintStream;
|
|
|
import java.util.Date;
|
|
|
import java.util.StringTokenizer;
|
|
|
|
|
|
-import org.apache.commons.logging.*;
|
|
|
-
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.conf.Configured;
|
|
|
+import org.apache.hadoop.io.LongWritable;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
import org.apache.hadoop.mapred.*;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
-import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
-import org.apache.hadoop.conf.*;
|
|
|
+import org.apache.hadoop.util.Tool;
|
|
|
+import org.apache.hadoop.util.ToolRunner;
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
/**
|
|
|
* Distributed i/o benchmark.
|
|
@@ -59,8 +70,9 @@ import org.apache.hadoop.conf.*;
|
|
|
* <li>standard deviation of i/o rate </li>
|
|
|
* </ul>
|
|
|
*/
|
|
|
-public class TestDFSIO extends TestCase {
|
|
|
+public class TestDFSIO extends Configured implements Tool {
|
|
|
// Constants
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
|
|
|
private static final int TEST_TYPE_READ = 0;
|
|
|
private static final int TEST_TYPE_WRITE = 1;
|
|
|
private static final int TEST_TYPE_CLEANUP = 2;
|
|
@@ -68,8 +80,6 @@ public class TestDFSIO extends TestCase {
|
|
|
private static final String BASE_FILE_NAME = "test_io_";
|
|
|
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
|
|
|
|
|
|
- private static final Log LOG = FileInputFormat.LOG;
|
|
|
- private static Configuration fsConfig = new Configuration();
|
|
|
private static final long MEGA = 0x100000;
|
|
|
private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
|
|
|
private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
|
|
@@ -77,13 +87,19 @@ public class TestDFSIO extends TestCase {
|
|
|
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
|
|
|
private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
|
|
|
|
|
|
+ static{
|
|
|
+ Configuration.addDefaultResource("hdfs-default.xml");
|
|
|
+ Configuration.addDefaultResource("hdfs-site.xml");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Run the test with default parameters.
|
|
|
*
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
+ @Test
|
|
|
public void testIOs() throws Exception {
|
|
|
- testIOs(10, 10);
|
|
|
+ testIOs(10, 10, new Configuration());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -93,21 +109,21 @@ public class TestDFSIO extends TestCase {
|
|
|
* @param nrFiles number of files
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static void testIOs(int fileSize, int nrFiles)
|
|
|
+ public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig)
|
|
|
throws IOException {
|
|
|
|
|
|
FileSystem fs = FileSystem.get(fsConfig);
|
|
|
|
|
|
- createControlFile(fs, fileSize, nrFiles);
|
|
|
- writeTest(fs);
|
|
|
- readTest(fs);
|
|
|
+ createControlFile(fs, fileSize, nrFiles, fsConfig);
|
|
|
+ writeTest(fs, fsConfig);
|
|
|
+ readTest(fs, fsConfig);
|
|
|
cleanup(fs);
|
|
|
}
|
|
|
|
|
|
- private static void createControlFile(
|
|
|
- FileSystem fs,
|
|
|
+ private static void createControlFile(FileSystem fs,
|
|
|
int fileSize, // in MB
|
|
|
- int nrFiles
|
|
|
+ int nrFiles,
|
|
|
+ Configuration fsConfig
|
|
|
) throws IOException {
|
|
|
LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
|
|
|
|
|
@@ -119,9 +135,9 @@ public class TestDFSIO extends TestCase {
|
|
|
SequenceFile.Writer writer = null;
|
|
|
try {
|
|
|
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
|
|
|
- UTF8.class, LongWritable.class,
|
|
|
+ Text.class, LongWritable.class,
|
|
|
CompressionType.NONE);
|
|
|
- writer.append(new UTF8(name), new LongWritable(fileSize));
|
|
|
+ writer.append(new Text(name), new LongWritable(fileSize));
|
|
|
} catch(Exception e) {
|
|
|
throw new IOException(e.getLocalizedMessage());
|
|
|
} finally {
|
|
@@ -149,41 +165,44 @@ public class TestDFSIO extends TestCase {
|
|
|
* <li>i/o rate squared</li>
|
|
|
* </ul>
|
|
|
*/
|
|
|
- private abstract static class IOStatMapper extends IOMapperBase {
|
|
|
+ private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
|
|
|
IOStatMapper() {
|
|
|
- super(fsConfig);
|
|
|
}
|
|
|
|
|
|
- void collectStats(OutputCollector<UTF8, UTF8> output,
|
|
|
+ void collectStats(OutputCollector<Text, Text> output,
|
|
|
String name,
|
|
|
long execTime,
|
|
|
- Object objSize) throws IOException {
|
|
|
- long totalSize = ((Long)objSize).longValue();
|
|
|
+ Long objSize) throws IOException {
|
|
|
+ long totalSize = objSize.longValue();
|
|
|
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
|
|
|
LOG.info("Number of bytes processed = " + totalSize);
|
|
|
LOG.info("Exec time = " + execTime);
|
|
|
LOG.info("IO rate = " + ioRateMbSec);
|
|
|
|
|
|
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
|
|
|
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
|
|
|
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
|
|
|
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
|
|
|
- output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
|
|
|
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
|
|
|
+ new Text(String.valueOf(1)));
|
|
|
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
|
|
|
+ new Text(String.valueOf(totalSize)));
|
|
|
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
|
|
|
+ new Text(String.valueOf(execTime)));
|
|
|
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
|
|
|
+ new Text(String.valueOf(ioRateMbSec*1000)));
|
|
|
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
|
|
|
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Write mapper class.
|
|
|
*/
|
|
|
- public static class WriteMapper extends IOStatMapper {
|
|
|
+ public static class WriteMapper extends IOStatMapper<Long> {
|
|
|
|
|
|
public WriteMapper() {
|
|
|
- super();
|
|
|
for(int i=0; i < bufferSize; i++)
|
|
|
buffer[i] = (byte)('0' + i % 50);
|
|
|
}
|
|
|
|
|
|
- public Object doIO(Reporter reporter,
|
|
|
+ public Long doIO(Reporter reporter,
|
|
|
String name,
|
|
|
long totalSize
|
|
|
) throws IOException {
|
|
@@ -205,22 +224,24 @@ public class TestDFSIO extends TestCase {
|
|
|
} finally {
|
|
|
out.close();
|
|
|
}
|
|
|
- return new Long(totalSize);
|
|
|
+ return Long.valueOf(totalSize);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void writeTest(FileSystem fs)
|
|
|
- throws IOException {
|
|
|
+ private static void writeTest(FileSystem fs, Configuration fsConfig)
|
|
|
+ throws IOException {
|
|
|
|
|
|
fs.delete(DATA_DIR, true);
|
|
|
fs.delete(WRITE_DIR, true);
|
|
|
|
|
|
- runIOTest(WriteMapper.class, WRITE_DIR);
|
|
|
+ runIOTest(WriteMapper.class, WRITE_DIR, fsConfig);
|
|
|
}
|
|
|
|
|
|
- private static void runIOTest( Class<? extends Mapper> mapperClass,
|
|
|
- Path outputDir
|
|
|
- ) throws IOException {
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
+ private static void runIOTest(
|
|
|
+ Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
|
|
|
+ Path outputDir,
|
|
|
+ Configuration fsConfig) throws IOException {
|
|
|
JobConf job = new JobConf(fsConfig, TestDFSIO.class);
|
|
|
|
|
|
FileInputFormat.setInputPaths(job, CONTROL_DIR);
|
|
@@ -230,8 +251,8 @@ public class TestDFSIO extends TestCase {
|
|
|
job.setReducerClass(AccumulatingReducer.class);
|
|
|
|
|
|
FileOutputFormat.setOutputPath(job, outputDir);
|
|
|
- job.setOutputKeyClass(UTF8.class);
|
|
|
- job.setOutputValueClass(UTF8.class);
|
|
|
+ job.setOutputKeyClass(Text.class);
|
|
|
+ job.setOutputValueClass(Text.class);
|
|
|
job.setNumReduceTasks(1);
|
|
|
JobClient.runJob(job);
|
|
|
}
|
|
@@ -239,13 +260,12 @@ public class TestDFSIO extends TestCase {
|
|
|
/**
|
|
|
* Read mapper class.
|
|
|
*/
|
|
|
- public static class ReadMapper extends IOStatMapper {
|
|
|
+ public static class ReadMapper extends IOStatMapper<Long> {
|
|
|
|
|
|
public ReadMapper() {
|
|
|
- super();
|
|
|
}
|
|
|
|
|
|
- public Object doIO(Reporter reporter,
|
|
|
+ public Long doIO(Reporter reporter,
|
|
|
String name,
|
|
|
long totalSize
|
|
|
) throws IOException {
|
|
@@ -264,22 +284,22 @@ public class TestDFSIO extends TestCase {
|
|
|
} finally {
|
|
|
in.close();
|
|
|
}
|
|
|
- return new Long(totalSize);
|
|
|
+ return Long.valueOf(totalSize);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void readTest(FileSystem fs) throws IOException {
|
|
|
+ private static void readTest(FileSystem fs, Configuration fsConfig)
|
|
|
+ throws IOException {
|
|
|
fs.delete(READ_DIR, true);
|
|
|
- runIOTest(ReadMapper.class, READ_DIR);
|
|
|
+ runIOTest(ReadMapper.class, READ_DIR, fsConfig);
|
|
|
}
|
|
|
|
|
|
- private static void sequentialTest(
|
|
|
- FileSystem fs,
|
|
|
+ private static void sequentialTest(FileSystem fs,
|
|
|
int testType,
|
|
|
int fileSize,
|
|
|
int nrFiles
|
|
|
) throws Exception {
|
|
|
- IOStatMapper ioer = null;
|
|
|
+ IOStatMapper<Long> ioer = null;
|
|
|
if (testType == TEST_TYPE_READ)
|
|
|
ioer = new ReadMapper();
|
|
|
else if (testType == TEST_TYPE_WRITE)
|
|
@@ -292,21 +312,102 @@ public class TestDFSIO extends TestCase {
|
|
|
MEGA*fileSize);
|
|
|
}
|
|
|
|
|
|
- public static void main(String[] args) {
|
|
|
+ public static void main(String[] args) throws Exception{
|
|
|
+ int res = ToolRunner.run(new TestDFSIO(), args);
|
|
|
+ System.exit(res);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void analyzeResult( FileSystem fs,
|
|
|
+ int testType,
|
|
|
+ long execTime,
|
|
|
+ String resFileName
|
|
|
+ ) throws IOException {
|
|
|
+ Path reduceFile;
|
|
|
+ if (testType == TEST_TYPE_WRITE)
|
|
|
+ reduceFile = new Path(WRITE_DIR, "part-00000");
|
|
|
+ else
|
|
|
+ reduceFile = new Path(READ_DIR, "part-00000");
|
|
|
+ long tasks = 0;
|
|
|
+ long size = 0;
|
|
|
+ long time = 0;
|
|
|
+ float rate = 0;
|
|
|
+ float sqrate = 0;
|
|
|
+ DataInputStream in = null;
|
|
|
+ BufferedReader lines = null;
|
|
|
+ try {
|
|
|
+ in = new DataInputStream(fs.open(reduceFile));
|
|
|
+ lines = new BufferedReader(new InputStreamReader(in));
|
|
|
+ String line;
|
|
|
+ while((line = lines.readLine()) != null) {
|
|
|
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
|
|
|
+ String attr = tokens.nextToken();
|
|
|
+ if (attr.endsWith(":tasks"))
|
|
|
+ tasks = Long.parseLong(tokens.nextToken());
|
|
|
+ else if (attr.endsWith(":size"))
|
|
|
+ size = Long.parseLong(tokens.nextToken());
|
|
|
+ else if (attr.endsWith(":time"))
|
|
|
+ time = Long.parseLong(tokens.nextToken());
|
|
|
+ else if (attr.endsWith(":rate"))
|
|
|
+ rate = Float.parseFloat(tokens.nextToken());
|
|
|
+ else if (attr.endsWith(":sqrate"))
|
|
|
+ sqrate = Float.parseFloat(tokens.nextToken());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if(in != null) in.close();
|
|
|
+ if(lines != null) lines.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ double med = rate / 1000 / tasks;
|
|
|
+ double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
|
|
|
+ String resultLines[] = {
|
|
|
+ "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
|
|
|
+ (testType == TEST_TYPE_READ) ? "read" :
|
|
|
+ "unknown"),
|
|
|
+ " Date & time: " + new Date(System.currentTimeMillis()),
|
|
|
+ " Number of files: " + tasks,
|
|
|
+ "Total MBytes processed: " + size/MEGA,
|
|
|
+ " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
|
|
|
+ "Average IO rate mb/sec: " + med,
|
|
|
+ " IO rate std deviation: " + stdDev,
|
|
|
+ " Test exec time sec: " + (float)execTime / 1000,
|
|
|
+ "" };
|
|
|
+
|
|
|
+ PrintStream res = null;
|
|
|
+ try {
|
|
|
+ res = new PrintStream(new FileOutputStream(new File(resFileName), true));
|
|
|
+ for(int i = 0; i < resultLines.length; i++) {
|
|
|
+ LOG.info(resultLines[i]);
|
|
|
+ res.println(resultLines[i]);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if(res != null) res.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void cleanup(FileSystem fs) throws IOException {
|
|
|
+ LOG.info("Cleaning up test files");
|
|
|
+ fs.delete(new Path(TEST_ROOT_DIR), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int run(String[] args) throws Exception {
|
|
|
int testType = TEST_TYPE_READ;
|
|
|
int bufferSize = DEFAULT_BUFFER_SIZE;
|
|
|
int fileSize = 1;
|
|
|
int nrFiles = 1;
|
|
|
String resFileName = DEFAULT_RES_FILE_NAME;
|
|
|
boolean isSequential = false;
|
|
|
-
|
|
|
- String version="TestFDSIO.0.0.4";
|
|
|
- String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
|
|
|
+
|
|
|
+ String className = TestDFSIO.class.getSimpleName();
|
|
|
+ String version = className + ".0.0.4";
|
|
|
+ String usage = "Usage: " + className + " -read | -write | -clean " +
|
|
|
+ "[-nrFiles N] [-fileSize MB] [-resFile resultFileName] " +
|
|
|
+ "[-bufferSize Bytes] ";
|
|
|
|
|
|
System.out.println(version);
|
|
|
if (args.length == 0) {
|
|
|
System.err.println(usage);
|
|
|
- System.exit(-1);
|
|
|
+ return -1;
|
|
|
}
|
|
|
for (int i = 0; i < args.length; i++) { // parse command line
|
|
|
if (args[i].startsWith("-read")) {
|
|
@@ -333,6 +434,7 @@ public class TestDFSIO extends TestCase {
|
|
|
LOG.info("bufferSize = " + bufferSize);
|
|
|
|
|
|
try {
|
|
|
+ Configuration fsConfig = new Configuration(getConf());
|
|
|
fsConfig.setInt("test.io.file.buffer.size", bufferSize);
|
|
|
FileSystem fs = FileSystem.get(fsConfig);
|
|
|
|
|
@@ -342,89 +444,25 @@ public class TestDFSIO extends TestCase {
|
|
|
long execTime = System.currentTimeMillis() - tStart;
|
|
|
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
|
|
|
LOG.info(resultLine);
|
|
|
- return;
|
|
|
+ return 0;
|
|
|
}
|
|
|
if (testType == TEST_TYPE_CLEANUP) {
|
|
|
cleanup(fs);
|
|
|
- return;
|
|
|
+ return 0;
|
|
|
}
|
|
|
- createControlFile(fs, fileSize, nrFiles);
|
|
|
+ createControlFile(fs, fileSize, nrFiles, fsConfig);
|
|
|
long tStart = System.currentTimeMillis();
|
|
|
if (testType == TEST_TYPE_WRITE)
|
|
|
- writeTest(fs);
|
|
|
+ writeTest(fs, fsConfig);
|
|
|
if (testType == TEST_TYPE_READ)
|
|
|
- readTest(fs);
|
|
|
+ readTest(fs, fsConfig);
|
|
|
long execTime = System.currentTimeMillis() - tStart;
|
|
|
|
|
|
analyzeResult(fs, testType, execTime, resFileName);
|
|
|
} catch(Exception e) {
|
|
|
System.err.print(StringUtils.stringifyException(e));
|
|
|
- System.exit(-1);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static void analyzeResult( FileSystem fs,
|
|
|
- int testType,
|
|
|
- long execTime,
|
|
|
- String resFileName
|
|
|
- ) throws IOException {
|
|
|
- Path reduceFile;
|
|
|
- if (testType == TEST_TYPE_WRITE)
|
|
|
- reduceFile = new Path(WRITE_DIR, "part-00000");
|
|
|
- else
|
|
|
- reduceFile = new Path(READ_DIR, "part-00000");
|
|
|
- DataInputStream in;
|
|
|
- in = new DataInputStream(fs.open(reduceFile));
|
|
|
-
|
|
|
- BufferedReader lines;
|
|
|
- lines = new BufferedReader(new InputStreamReader(in));
|
|
|
- long tasks = 0;
|
|
|
- long size = 0;
|
|
|
- long time = 0;
|
|
|
- float rate = 0;
|
|
|
- float sqrate = 0;
|
|
|
- String line;
|
|
|
- while((line = lines.readLine()) != null) {
|
|
|
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
|
|
|
- String attr = tokens.nextToken();
|
|
|
- if (attr.endsWith(":tasks"))
|
|
|
- tasks = Long.parseLong(tokens.nextToken());
|
|
|
- else if (attr.endsWith(":size"))
|
|
|
- size = Long.parseLong(tokens.nextToken());
|
|
|
- else if (attr.endsWith(":time"))
|
|
|
- time = Long.parseLong(tokens.nextToken());
|
|
|
- else if (attr.endsWith(":rate"))
|
|
|
- rate = Float.parseFloat(tokens.nextToken());
|
|
|
- else if (attr.endsWith(":sqrate"))
|
|
|
- sqrate = Float.parseFloat(tokens.nextToken());
|
|
|
- }
|
|
|
-
|
|
|
- double med = rate / 1000 / tasks;
|
|
|
- double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
|
|
|
- String resultLines[] = {
|
|
|
- "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
|
|
|
- (testType == TEST_TYPE_READ) ? "read" :
|
|
|
- "unknown"),
|
|
|
- " Date & time: " + new Date(System.currentTimeMillis()),
|
|
|
- " Number of files: " + tasks,
|
|
|
- "Total MBytes processed: " + size/MEGA,
|
|
|
- " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
|
|
|
- "Average IO rate mb/sec: " + med,
|
|
|
- " IO rate std deviation: " + stdDev,
|
|
|
- " Test exec time sec: " + (float)execTime / 1000,
|
|
|
- "" };
|
|
|
-
|
|
|
- PrintStream res = new PrintStream(
|
|
|
- new FileOutputStream(
|
|
|
- new File(resFileName), true));
|
|
|
- for(int i = 0; i < resultLines.length; i++) {
|
|
|
- LOG.info(resultLines[i]);
|
|
|
- res.println(resultLines[i]);
|
|
|
+ return -1;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private static void cleanup(FileSystem fs) throws IOException {
|
|
|
- LOG.info("Cleaning up test files");
|
|
|
- fs.delete(new Path(TEST_ROOT_DIR), true);
|
|
|
+ return 0;
|
|
|
}
|
|
|
}
|