|
@@ -18,45 +18,42 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.Date;
|
|
|
+import java.io.BufferedReader;
|
|
|
import java.io.DataInputStream;
|
|
|
+import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
import java.io.InputStreamReader;
|
|
|
import java.io.PrintStream;
|
|
|
-import java.io.File;
|
|
|
-import java.io.BufferedReader;
|
|
|
-import java.util.StringTokenizer;
|
|
|
import java.net.InetAddress;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
+import java.util.Date;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.StringTokenizer;
|
|
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
-
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
-
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-
|
|
|
-import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
-import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
-
|
|
|
+import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
|
-import org.apache.hadoop.mapred.Mapper;
|
|
|
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
|
import org.apache.hadoop.mapred.JobClient;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.MapReduceBase;
|
|
|
-import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.Mapper;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.Reducer;
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
|
|
|
|
|
|
/**
|
|
|
* This program executes a specified operation that applies load to
|
|
@@ -149,7 +146,7 @@ public class NNBench {
|
|
|
try {
|
|
|
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
|
|
|
LongWritable.class, CompressionType.NONE);
|
|
|
- writer.append(new Text(strFileName), new LongWritable(0l));
|
|
|
+ writer.append(new Text(strFileName), new LongWritable(i));
|
|
|
} finally {
|
|
|
if (writer != null) {
|
|
|
writer.close();
|
|
@@ -309,14 +306,7 @@ public class NNBench {
|
|
|
*/
|
|
|
private static void analyzeResults() throws IOException {
|
|
|
final FileSystem fs = FileSystem.get(config);
|
|
|
- Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
|
|
|
- "part-00000");
|
|
|
-
|
|
|
- DataInputStream in;
|
|
|
- in = new DataInputStream(fs.open(reduceFile));
|
|
|
-
|
|
|
- BufferedReader lines;
|
|
|
- lines = new BufferedReader(new InputStreamReader(in));
|
|
|
+ Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME);
|
|
|
|
|
|
long totalTimeAL1 = 0l;
|
|
|
long totalTimeAL2 = 0l;
|
|
@@ -327,32 +317,38 @@ public class NNBench {
|
|
|
|
|
|
long mapStartTimeTPmS = 0l;
|
|
|
long mapEndTimeTPmS = 0l;
|
|
|
-
|
|
|
- String resultTPSLine1 = null;
|
|
|
- String resultTPSLine2 = null;
|
|
|
- String resultALLine1 = null;
|
|
|
- String resultALLine2 = null;
|
|
|
-
|
|
|
- String line;
|
|
|
- while((line = lines.readLine()) != null) {
|
|
|
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
|
|
|
- String attr = tokens.nextToken();
|
|
|
- if (attr.endsWith(":totalTimeAL1")) {
|
|
|
- totalTimeAL1 = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":totalTimeAL2")) {
|
|
|
- totalTimeAL2 = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":totalTimeTPmS")) {
|
|
|
- totalTimeTPmS = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":latemaps")) {
|
|
|
- lateMaps = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":numOfExceptions")) {
|
|
|
- numOfExceptions = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":successfulFileOps")) {
|
|
|
- successfulFileOps = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":mapStartTimeTPmS")) {
|
|
|
- mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
|
|
|
- } else if (attr.endsWith(":mapEndTimeTPmS")) {
|
|
|
- mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
|
|
|
+
|
|
|
+ FileStatus[] fss = fs.listStatus(reduceDir);
|
|
|
+ for (FileStatus status : fss) {
|
|
|
+
|
|
|
+ Path reduceFile = status.getPath();
|
|
|
+ DataInputStream in;
|
|
|
+ in = new DataInputStream(fs.open(reduceFile));
|
|
|
+
|
|
|
+ BufferedReader lines;
|
|
|
+ lines = new BufferedReader(new InputStreamReader(in));
|
|
|
+
|
|
|
+ String line;
|
|
|
+ while ((line = lines.readLine()) != null) {
|
|
|
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
|
|
|
+ String attr = tokens.nextToken();
|
|
|
+ if (attr.endsWith(":totalTimeAL1")) {
|
|
|
+ totalTimeAL1 = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":totalTimeAL2")) {
|
|
|
+ totalTimeAL2 = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":totalTimeTPmS")) {
|
|
|
+ totalTimeTPmS = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":latemaps")) {
|
|
|
+ lateMaps = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":numOfExceptions")) {
|
|
|
+ numOfExceptions = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":successfulFileOps")) {
|
|
|
+ successfulFileOps = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":mapStartTimeTPmS")) {
|
|
|
+ mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
|
|
|
+ } else if (attr.endsWith(":mapEndTimeTPmS")) {
|
|
|
+ mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -377,6 +373,11 @@ public class NNBench {
|
|
|
(double) successfulFileOps :
|
|
|
(double) totalTimeTPmS / successfulFileOps;
|
|
|
|
|
|
+ String resultTPSLine1 = null;
|
|
|
+ String resultTPSLine2 = null;
|
|
|
+ String resultALLine1 = null;
|
|
|
+ String resultALLine2 = null;
|
|
|
+
|
|
|
if (operation.equals(OP_CREATE_WRITE)) {
|
|
|
// For create/write/close, it is treated as two transactions,
|
|
|
// since a file create from a client perspective involves create and close
|
|
@@ -699,18 +700,21 @@ public class NNBench {
|
|
|
successfulFileOps = 0l;
|
|
|
|
|
|
if (barrier()) {
|
|
|
+ String fileName = "file_" + value;
|
|
|
if (op.equals(OP_CREATE_WRITE)) {
|
|
|
startTimeTPmS = System.currentTimeMillis();
|
|
|
- doCreateWriteOp("file_" + hostName + "_", reporter);
|
|
|
+ doCreateWriteOp(fileName, reporter);
|
|
|
} else if (op.equals(OP_OPEN_READ)) {
|
|
|
startTimeTPmS = System.currentTimeMillis();
|
|
|
- doOpenReadOp("file_" + hostName + "_", reporter);
|
|
|
+ doOpenReadOp(fileName, reporter);
|
|
|
} else if (op.equals(OP_RENAME)) {
|
|
|
startTimeTPmS = System.currentTimeMillis();
|
|
|
- doRenameOp("file_" + hostName + "_", reporter);
|
|
|
+ doRenameOp(fileName, reporter);
|
|
|
} else if (op.equals(OP_DELETE)) {
|
|
|
startTimeTPmS = System.currentTimeMillis();
|
|
|
- doDeleteOp("file_" + hostName + "_", reporter);
|
|
|
+ } else {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "unsupported operation [" + op + "]");
|
|
|
}
|
|
|
|
|
|
endTimeTPms = System.currentTimeMillis();
|
|
@@ -777,9 +781,8 @@ public class NNBench {
|
|
|
|
|
|
reporter.setStatus("Finish "+ l + " files");
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Exception recorded in op: " +
|
|
|
- "Create/Write/Close");
|
|
|
-
|
|
|
+ LOG.error("Exception recorded in op: Create/Write/Close, "
|
|
|
+ + "file: \"" + filePath + "\"", e);
|
|
|
numOfExceptions++;
|
|
|
}
|
|
|
}
|
|
@@ -822,7 +825,8 @@ public class NNBench {
|
|
|
|
|
|
reporter.setStatus("Finish "+ l + " files");
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Exception recorded in op: OpenRead " + e);
|
|
|
+ LOG.error("Exception recorded in op: OpenRead, " + "file: \""
|
|
|
+ + filePath + "\"", e);
|
|
|
numOfExceptions++;
|
|
|
}
|
|
|
}
|
|
@@ -856,8 +860,8 @@ public class NNBench {
|
|
|
|
|
|
reporter.setStatus("Finish "+ l + " files");
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Exception recorded in op: Rename");
|
|
|
-
|
|
|
+ LOG.error("Exception recorded in op: Rename, " + "file: \""
|
|
|
+ + filePath + "\"", e);
|
|
|
numOfExceptions++;
|
|
|
}
|
|
|
}
|
|
@@ -889,8 +893,8 @@ public class NNBench {
|
|
|
|
|
|
reporter.setStatus("Finish "+ l + " files");
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Exception in recorded op: Delete");
|
|
|
-
|
|
|
+ LOG.error("Exception recorded in op: Delete, " + "file: \""
|
|
|
+ + filePath + "\"", e);
|
|
|
numOfExceptions++;
|
|
|
}
|
|
|
}
|