|
@@ -22,6 +22,8 @@ import java.io.DataInput;
|
|
|
import java.io.DataOutput;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.text.NumberFormat;
|
|
|
|
|
@@ -29,11 +31,15 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configurable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.dfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
+import org.apache.hadoop.fs.RawLocalFileSystem;
|
|
|
+import org.apache.hadoop.fs.kfs.KosmosFileSystem;
|
|
|
+import org.apache.hadoop.fs.s3.S3FileSystem;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.UTF8;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
@@ -59,6 +65,16 @@ abstract class Task implements Writable, Configurable {
|
|
|
REDUCE_INPUT_RECORDS,
|
|
|
REDUCE_OUTPUT_RECORDS
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Counters to measure the usage of the different file systems.
|
|
|
+ */
|
|
|
+ protected static enum FileSystemCounter {
|
|
|
+ LOCAL_READ, LOCAL_WRITE,
|
|
|
+ HDFS_READ, HDFS_WRITE,
|
|
|
+ S3_READ, S3_WRITE,
|
|
|
+ KFS_READ, KFSWRITE
|
|
|
+ }
|
|
|
|
|
|
///////////////////////////////////////////////////////////
|
|
|
// Helper methods to construct task-output paths
|
|
@@ -293,6 +309,7 @@ abstract class Task implements Writable, Configurable {
|
|
|
|
|
|
if (sendProgress) {
|
|
|
// we need to send progress update
|
|
|
+ updateCounters();
|
|
|
taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(),
|
|
|
counters);
|
|
|
taskFound = umbilical.statusUpdate(taskId, taskStatus);
|
|
@@ -362,9 +379,81 @@ abstract class Task implements Writable, Configurable {
|
|
|
setProgressFlag();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * An updater that tracks the last number reported for a given file
|
|
|
+ * system and only creates the counters when they are needed.
|
|
|
+ */
|
|
|
+ class FileSystemStatisticUpdater {
|
|
|
+ private long prevReadBytes = 0;
|
|
|
+ private long prevWriteBytes = 0;
|
|
|
+ private FileSystem.Statistics stats;
|
|
|
+ private Counters.Counter readCounter = null;
|
|
|
+ private Counters.Counter writeCounter = null;
|
|
|
+ private FileSystemCounter read;
|
|
|
+ private FileSystemCounter write;
|
|
|
+
|
|
|
+ FileSystemStatisticUpdater(FileSystemCounter read,
|
|
|
+ FileSystemCounter write,
|
|
|
+ Class<? extends FileSystem> cls) {
|
|
|
+ stats = FileSystem.getStatistics(cls);
|
|
|
+ this.read = read;
|
|
|
+ this.write = write;
|
|
|
+ }
|
|
|
+
|
|
|
+ void updateCounters() {
|
|
|
+ long newReadBytes = stats.getBytesRead();
|
|
|
+ long newWriteBytes = stats.getBytesWritten();
|
|
|
+ if (prevReadBytes != newReadBytes) {
|
|
|
+ if (readCounter == null) {
|
|
|
+ readCounter = counters.findCounter(read);
|
|
|
+ }
|
|
|
+ readCounter.increment(newReadBytes - prevReadBytes);
|
|
|
+ prevReadBytes = newReadBytes;
|
|
|
+ }
|
|
|
+ if (prevWriteBytes != newWriteBytes) {
|
|
|
+ if (writeCounter == null) {
|
|
|
+ writeCounter = counters.findCounter(write);
|
|
|
+ }
|
|
|
+ writeCounter.increment(newWriteBytes - prevWriteBytes);
|
|
|
+ prevWriteBytes = newWriteBytes;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A list of all of the file systems that we want to report on.
|
|
|
+ */
|
|
|
+ private List<FileSystemStatisticUpdater> statisticUpdaters =
|
|
|
+ new ArrayList<FileSystemStatisticUpdater>();
|
|
|
+ {
|
|
|
+ statisticUpdaters.add
|
|
|
+ (new FileSystemStatisticUpdater(FileSystemCounter.LOCAL_READ,
|
|
|
+ FileSystemCounter.LOCAL_WRITE,
|
|
|
+ RawLocalFileSystem.class));
|
|
|
+ statisticUpdaters.add
|
|
|
+ (new FileSystemStatisticUpdater(FileSystemCounter.HDFS_READ,
|
|
|
+ FileSystemCounter.HDFS_WRITE,
|
|
|
+ DistributedFileSystem.class));
|
|
|
+ statisticUpdaters.add
|
|
|
+ (new FileSystemStatisticUpdater(FileSystemCounter.KFS_READ,
|
|
|
+ FileSystemCounter.KFSWRITE,
|
|
|
+ KosmosFileSystem.class));
|
|
|
+ statisticUpdaters.add
|
|
|
+ (new FileSystemStatisticUpdater(FileSystemCounter.S3_READ,
|
|
|
+ FileSystemCounter.S3_WRITE,
|
|
|
+ S3FileSystem.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void updateCounters() {
|
|
|
+ for(FileSystemStatisticUpdater updater: statisticUpdaters) {
|
|
|
+ updater.updateCounters();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void done(TaskUmbilicalProtocol umbilical) throws IOException {
|
|
|
int retries = 10;
|
|
|
boolean needProgress = true;
|
|
|
+ updateCounters();
|
|
|
taskDone.set(true);
|
|
|
while (true) {
|
|
|
try {
|