Selaa lähdekoodia

HADOOP-12975. Add jitter to CachingGetSpaceUsed's thread (Elliott Clark via Colin P. McCabe)

(cherry picked from commit bf780406f2b30e627bdf36ac07973f6931f81106)
Colin Patrick Mccabe 9 vuotta sitten
vanhempi
commit
56c997a1a6

+ 23 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java

@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -43,6 +44,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
   protected final AtomicLong used = new AtomicLong();
   private final AtomicBoolean running = new AtomicBoolean(true);
   private final long refreshInterval;
+  private final long jitter;
   private final String dirPath;
   private Thread refreshUsed;
 
@@ -52,7 +54,10 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
    */
   public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
       throws IOException {
-    this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
+    this(builder.getPath(),
+        builder.getInterval(),
+        builder.getJitter(),
+        builder.getInitialUsed());
   }
 
   /**
@@ -65,10 +70,12 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
    */
   CachingGetSpaceUsed(File path,
                       long interval,
+                      long jitter,
                       long initialUsed) throws IOException {
-    dirPath = path.getCanonicalPath();
-    refreshInterval = interval;
-    used.set(initialUsed);
+    this.dirPath = path.getCanonicalPath();
+    this.refreshInterval = interval;
+    this.jitter = jitter;
+    this.used.set(initialUsed);
   }
 
   void init() {
@@ -155,7 +162,18 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
     public void run() {
       while (spaceUsed.running()) {
         try {
-          Thread.sleep(spaceUsed.getRefreshInterval());
+          long refreshInterval = spaceUsed.refreshInterval;
+
+          if (spaceUsed.jitter > 0) {
+            long jitter = spaceUsed.jitter;
+            // add/subtract the jitter.
+            refreshInterval +=
+                ThreadLocalRandom.current()
+                                 .nextLong(-jitter, jitter);
+          }
+          // Make sure that after the jitter we didn't end up at 0.
+          refreshInterval = Math.max(refreshInterval, 1);
+          Thread.sleep(refreshInterval);
           // update the used variable
           spaceUsed.refresh();
         } catch (InterruptedException e) {

+ 7 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DU.java

@@ -34,12 +34,16 @@ public class DU extends CachingGetSpaceUsed {
   private DUShell duShell;
 
   @VisibleForTesting
-   public DU(File path, long interval, long initialUsed) throws IOException {
-    super(path, interval, initialUsed);
+  public DU(File path, long interval, long jitter, long initialUsed)
+      throws IOException {
+    super(path, interval, jitter, initialUsed);
   }
 
   public DU(CachingGetSpaceUsed.Builder builder) throws IOException {
-    this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
+    this(builder.getPath(),
+        builder.getInterval(),
+        builder.getJitter(),
+        builder.getInitialUsed());
   }
 
   @Override

+ 25 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GetSpaceUsed.java

@@ -26,8 +26,11 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.TimeUnit;
 
 public interface GetSpaceUsed {
+
+
   long getUsed() throws IOException;
 
   /**
@@ -37,11 +40,15 @@ public interface GetSpaceUsed {
     static final Logger LOG = LoggerFactory.getLogger(Builder.class);
 
     static final String CLASSNAME_KEY = "fs.getspaceused.classname";
+    static final String JITTER_KEY = "fs.getspaceused.jitterMillis";
+    static final long DEFAULT_JITTER = TimeUnit.MINUTES.toMillis(1);
+
 
     private Configuration conf;
     private Class<? extends GetSpaceUsed> klass = null;
     private File path = null;
     private Long interval = null;
+    private Long jitter = null;
     private Long initialUsed = null;
 
     public Configuration getConf() {
@@ -111,6 +118,24 @@ public interface GetSpaceUsed {
       return this;
     }
 
+
+    public long getJitter() {
+      if (jitter == null) {
+        Configuration configuration = this.conf;
+
+        if (configuration == null) {
+          return DEFAULT_JITTER;
+        }
+        return configuration.getLong(JITTER_KEY, DEFAULT_JITTER);
+      }
+      return jitter;
+    }
+
+    public Builder setJitter(Long jit) {
+      this.jitter = jit;
+      return this;
+    }
+
     public GetSpaceUsed build() throws IOException {
       GetSpaceUsed getSpaceUsed = null;
       try {

+ 5 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WindowsGetSpaceUsed.java

@@ -31,10 +31,11 @@ import java.io.IOException;
 @InterfaceStability.Evolving
 public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
 
-
-  public WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
-      throws IOException {
-    super(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
+  WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException {
+    super(builder.getPath(),
+        builder.getInterval(),
+        builder.getJitter(),
+        builder.getInitialUsed());
   }
 
   /**

+ 5 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDU.java

@@ -79,7 +79,7 @@ public class TestDU extends TestCase {
 
     Thread.sleep(5000); // let the metadata updater catch up
 
-    DU du = new DU(file, 10000, -1);
+    DU du = new DU(file, 10000, 0, -1);
     du.init();
     long duSize = du.getUsed();
     du.close();
@@ -89,7 +89,7 @@ public class TestDU extends TestCase {
         writtenSize <= (duSize + slack));
 
     //test with 0 interval, will not launch thread
-    du = new DU(file, 0, -1);
+    du = new DU(file, 0, 1, -1);
     du.init();
     duSize = du.getUsed();
     du.close();
@@ -99,7 +99,7 @@ public class TestDU extends TestCase {
         writtenSize <= (duSize + slack));
 
     //test without launching thread
-    du = new DU(file, 10000, -1);
+    du = new DU(file, 10000, 0, -1);
     du.init();
     duSize = du.getUsed();
 
@@ -112,7 +112,7 @@ public class TestDU extends TestCase {
     assertTrue(file.createNewFile());
     Configuration conf = new Configuration();
     conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
-    DU du = new DU(file, 10000L, -1);
+    DU du = new DU(file, 10000L, 0, -1);
     du.incDfsUsed(-Long.MAX_VALUE);
     long duSize = du.getUsed();
     assertTrue(String.valueOf(duSize), duSize >= 0L);
@@ -121,7 +121,7 @@ public class TestDU extends TestCase {
   public void testDUSetInitialValue() throws IOException {
     File file = new File(DU_DIR, "dataX");
     createFile(file, 8192);
-    DU du = new DU(file, 3000, 1024);
+    DU du = new DU(file, 3000, 0, 1024);
     du.init();
     assertTrue("Initial usage setting not honored", du.getUsed() == 1024);