|
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.ref.WeakReference;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
@@ -31,6 +32,7 @@ import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.IdentityHashMap;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.NoSuchElementException;
|
|
@@ -2502,28 +2504,149 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Tracks statistics about how many reads, writes, and so forth have been
|
|
|
+ * done in a FileSystem.
|
|
|
+ *
|
|
|
+ * Since there is only one of these objects per FileSystem, there will
|
|
|
+ * typically be many threads writing to this object. Almost every operation
|
|
|
+ * on an open file will involve a write to this object. In contrast, reading
|
|
|
+ * statistics is done infrequently by most programs, and not at all by others.
|
|
|
+ * Hence, this is optimized for writes.
|
|
|
+ *
|
|
|
+ * Each thread writes to its own thread-local area of memory. This removes
|
|
|
+ * contention and allows us to scale up to many, many threads. To read
|
|
|
+ * statistics, the reader thread totals up the contents of all of the
|
|
|
+ * thread-local data areas.
|
|
|
+ */
|
|
|
public static final class Statistics {
|
|
|
+ /**
|
|
|
+ * Statistics data.
|
|
|
+ *
|
|
|
+ * There is only a single writer to thread-local StatisticsData objects.
|
|
|
+ * Hence, volatile is adequate here-- we do not need AtomicLong or similar
|
|
|
+ * to prevent lost updates.
|
|
|
+ * The Java specification guarantees that updates to volatile longs will
|
|
|
+ * be perceived as atomic with respect to other threads, which is all we
|
|
|
+ * need.
|
|
|
+ */
|
|
|
+ private static class StatisticsData {
|
|
|
+ volatile long bytesRead;
|
|
|
+ volatile long bytesWritten;
|
|
|
+ volatile int readOps;
|
|
|
+ volatile int largeReadOps;
|
|
|
+ volatile int writeOps;
|
|
|
+ /**
|
|
|
+ * Stores a weak reference to the thread owning this StatisticsData.
|
|
|
+ * This allows us to remove StatisticsData objects that pertain to
|
|
|
+ * threads that no longer exist.
|
|
|
+ */
|
|
|
+ final WeakReference<Thread> owner;
|
|
|
+
|
|
|
+ StatisticsData(WeakReference<Thread> owner) {
|
|
|
+ this.owner = owner;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add another StatisticsData object to this one.
|
|
|
+ */
|
|
|
+ void add(StatisticsData other) {
|
|
|
+ this.bytesRead += other.bytesRead;
|
|
|
+ this.bytesWritten += other.bytesWritten;
|
|
|
+ this.readOps += other.readOps;
|
|
|
+ this.largeReadOps += other.largeReadOps;
|
|
|
+ this.writeOps += other.writeOps;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Negate the values of all statistics.
|
|
|
+ */
|
|
|
+ void negate() {
|
|
|
+ this.bytesRead = -this.bytesRead;
|
|
|
+ this.bytesWritten = -this.bytesWritten;
|
|
|
+ this.readOps = -this.readOps;
|
|
|
+ this.largeReadOps = -this.largeReadOps;
|
|
|
+ this.writeOps = -this.writeOps;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
|
|
|
+ + readOps + " read ops, " + largeReadOps + " large read ops, "
|
|
|
+ + writeOps + " write ops";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private interface StatisticsAggregator<T> {
|
|
|
+ void accept(StatisticsData data);
|
|
|
+ T aggregate();
|
|
|
+ }
|
|
|
+
|
|
|
private final String scheme;
|
|
|
- private AtomicLong bytesRead = new AtomicLong();
|
|
|
- private AtomicLong bytesWritten = new AtomicLong();
|
|
|
- private AtomicInteger readOps = new AtomicInteger();
|
|
|
- private AtomicInteger largeReadOps = new AtomicInteger();
|
|
|
- private AtomicInteger writeOps = new AtomicInteger();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * rootData is data that doesn't belong to any thread, but will be added
|
|
|
+ * to the totals. This is useful for making copies of Statistics objects,
|
|
|
+ * and for storing data that pertains to threads that have been garbage
|
|
|
+ * collected. Protected by the Statistics lock.
|
|
|
+ */
|
|
|
+ private final StatisticsData rootData;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Thread-local data.
|
|
|
+ */
|
|
|
+ private final ThreadLocal<StatisticsData> threadData;
|
|
|
|
|
|
+ /**
|
|
|
+ * List of all thread-local data areas. Protected by the Statistics lock.
|
|
|
+ */
|
|
|
+ private LinkedList<StatisticsData> allData;
|
|
|
+
|
|
|
public Statistics(String scheme) {
|
|
|
this.scheme = scheme;
|
|
|
+ this.rootData = new StatisticsData(null);
|
|
|
+ this.threadData = new ThreadLocal<StatisticsData>();
|
|
|
+ this.allData = null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Copy constructor.
|
|
|
*
|
|
|
- * @param st
|
|
|
- * The input Statistics object which is cloned.
|
|
|
+ * @param other The input Statistics object which is cloned.
|
|
|
+ */
|
|
|
+ public Statistics(Statistics other) {
|
|
|
+ this.scheme = other.scheme;
|
|
|
+ this.rootData = new StatisticsData(null);
|
|
|
+ other.visitAll(new StatisticsAggregator<Void>() {
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ rootData.add(data);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Void aggregate() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ this.threadData = new ThreadLocal<StatisticsData>();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get or create the thread-local data associated with the current thread.
|
|
|
*/
|
|
|
- public Statistics(Statistics st) {
|
|
|
- this.scheme = st.scheme;
|
|
|
- this.bytesRead = new AtomicLong(st.bytesRead.longValue());
|
|
|
- this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
|
|
|
+ private StatisticsData getThreadData() {
|
|
|
+ StatisticsData data = threadData.get();
|
|
|
+ if (data == null) {
|
|
|
+ data = new StatisticsData(
|
|
|
+ new WeakReference<Thread>(Thread.currentThread()));
|
|
|
+ threadData.set(data);
|
|
|
+ synchronized(this) {
|
|
|
+ if (allData == null) {
|
|
|
+ allData = new LinkedList<StatisticsData>();
|
|
|
+ }
|
|
|
+ allData.add(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return data;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2531,7 +2654,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @param newBytes the additional bytes read
|
|
|
*/
|
|
|
public void incrementBytesRead(long newBytes) {
|
|
|
- bytesRead.getAndAdd(newBytes);
|
|
|
+ getThreadData().bytesRead += newBytes;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2539,7 +2662,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @param newBytes the additional bytes written
|
|
|
*/
|
|
|
public void incrementBytesWritten(long newBytes) {
|
|
|
- bytesWritten.getAndAdd(newBytes);
|
|
|
+ getThreadData().bytesWritten += newBytes;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2547,7 +2670,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @param count number of read operations
|
|
|
*/
|
|
|
public void incrementReadOps(int count) {
|
|
|
- readOps.getAndAdd(count);
|
|
|
+ getThreadData().readOps += count;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2555,7 +2678,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @param count number of large read operations
|
|
|
*/
|
|
|
public void incrementLargeReadOps(int count) {
|
|
|
- largeReadOps.getAndAdd(count);
|
|
|
+ getThreadData().largeReadOps += count;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2563,7 +2686,38 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @param count number of write operations
|
|
|
*/
|
|
|
public void incrementWriteOps(int count) {
|
|
|
- writeOps.getAndAdd(count);
|
|
|
+ getThreadData().writeOps += count;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Apply the given aggregator to all StatisticsData objects associated with
|
|
|
+ * this Statistics object.
|
|
|
+ *
|
|
|
+ * For each StatisticsData object, we will call accept on the visitor.
|
|
|
+ * Finally, at the end, we will call aggregate to get the final total.
|
|
|
+ *
|
|
|
+ * @param The visitor to use.
|
|
|
+ * @return The total.
|
|
|
+ */
|
|
|
+ private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
|
|
|
+ visitor.accept(rootData);
|
|
|
+ if (allData != null) {
|
|
|
+ for (Iterator<StatisticsData> iter = allData.iterator();
|
|
|
+ iter.hasNext(); ) {
|
|
|
+ StatisticsData data = iter.next();
|
|
|
+ visitor.accept(data);
|
|
|
+ if (data.owner.get() == null) {
|
|
|
+ /*
|
|
|
+ * If the thread that created this thread-local data no
|
|
|
+ * longer exists, remove the StatisticsData from our list
|
|
|
+ * and fold the values into rootData.
|
|
|
+ */
|
|
|
+ rootData.add(data);
|
|
|
+ iter.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return visitor.aggregate();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2571,7 +2725,18 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @return the number of bytes
|
|
|
*/
|
|
|
public long getBytesRead() {
|
|
|
- return bytesRead.get();
|
|
|
+ return visitAll(new StatisticsAggregator<Long>() {
|
|
|
+ private long bytesRead = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ bytesRead += data.bytesRead;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long aggregate() {
|
|
|
+ return bytesRead;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2579,7 +2744,18 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @return the number of bytes
|
|
|
*/
|
|
|
public long getBytesWritten() {
|
|
|
- return bytesWritten.get();
|
|
|
+ return visitAll(new StatisticsAggregator<Long>() {
|
|
|
+ private long bytesWritten = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ bytesWritten += data.bytesWritten;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long aggregate() {
|
|
|
+ return bytesWritten;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2587,7 +2763,19 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @return number of read operations
|
|
|
*/
|
|
|
public int getReadOps() {
|
|
|
- return readOps.get() + largeReadOps.get();
|
|
|
+ return visitAll(new StatisticsAggregator<Integer>() {
|
|
|
+ private int readOps = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ readOps += data.readOps;
|
|
|
+ readOps += data.largeReadOps;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Integer aggregate() {
|
|
|
+ return readOps;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2596,7 +2784,18 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @return number of large read operations
|
|
|
*/
|
|
|
public int getLargeReadOps() {
|
|
|
- return largeReadOps.get();
|
|
|
+ return visitAll(new StatisticsAggregator<Integer>() {
|
|
|
+ private int largeReadOps = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ largeReadOps += data.largeReadOps;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Integer aggregate() {
|
|
|
+ return largeReadOps;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2605,22 +2804,70 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|
|
* @return number of write operations
|
|
|
*/
|
|
|
public int getWriteOps() {
|
|
|
- return writeOps.get();
|
|
|
+ return visitAll(new StatisticsAggregator<Integer>() {
|
|
|
+ private int writeOps = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ writeOps += data.writeOps;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Integer aggregate() {
|
|
|
+ return writeOps;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
|
|
|
- + readOps + " read ops, " + largeReadOps + " large read ops, "
|
|
|
- + writeOps + " write ops";
|
|
|
+ return visitAll(new StatisticsAggregator<String>() {
|
|
|
+ private StatisticsData total = new StatisticsData(null);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ total.add(data);
|
|
|
+ }
|
|
|
+
|
|
|
+ public String aggregate() {
|
|
|
+ return total.toString();
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Reset the counts of bytes to 0.
|
|
|
+ * Resets all statistics to 0.
|
|
|
+ *
|
|
|
+ * In order to reset, we add up all the thread-local statistics data, and
|
|
|
+ * set rootData to the negative of that.
|
|
|
+ *
|
|
|
+ * This may seem like a counterintuitive way to reset the statsitics. Why
|
|
|
+ * can't we just zero out all the thread-local data? Well, thread-local
|
|
|
+ * data can only be modified by the thread that owns it. If we tried to
|
|
|
+ * modify the thread-local data from this thread, our modification might get
|
|
|
+ * interleaved with a read-modify-write operation done by the thread that
|
|
|
+ * owns the data. That would result in our update getting lost.
|
|
|
+ *
|
|
|
+ * The approach used here avoids this problem because it only ever reads
|
|
|
+ * (not writes) the thread-local data. Both reads and writes to rootData
|
|
|
+ * are done under the lock, so we're free to modify rootData from any thread
|
|
|
+ * that holds the lock.
|
|
|
*/
|
|
|
public void reset() {
|
|
|
- bytesWritten.set(0);
|
|
|
- bytesRead.set(0);
|
|
|
+ visitAll(new StatisticsAggregator<Void>() {
|
|
|
+ private StatisticsData total = new StatisticsData(null);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void accept(StatisticsData data) {
|
|
|
+ total.add(data);
|
|
|
+ }
|
|
|
+
|
|
|
+ public Void aggregate() {
|
|
|
+ total.negate();
|
|
|
+ rootData.add(total);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|