Browse Source

HDFS-15583. Backport DirectoryScanner improvements HDFS-14476, HDFS-14751 and HDFS-15048 to branch 3.2 and 3.1. Contributed by Stephen O'Donnell

S O'Donnell 4 years ago
parent
commit
5f34e3214e

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml

@@ -311,4 +311,9 @@
       <Method name="setInteractiveFormat" />
       <Method name="setInteractiveFormat" />
       <Bug pattern="ME_ENUM_FIELD_SETTER" />
       <Bug pattern="ME_ENUM_FIELD_SETTER" />
     </Match>
     </Match>
- </FindBugsFilter>
+    <Match>
+      <Class name="org.apache.hadoop.hdfs.server.datanode.DirectoryScanner" />
+      <Method name="reconcile" />
+      <Bug pattern="SWL_SLEEP_WITH_LOCK_HELD" />
+    </Match>
+</FindBugsFilter>

+ 25 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -67,7 +67,7 @@ public class DirectoryScanner implements Runnable {
       + " starting at %s with interval of %dms";
       + " starting at %s with interval of %dms";
   private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
   private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
       + " and throttle limit of %dms/s";
       + " and throttle limit of %dms/s";
-
+  private static final int RECONCILE_BLOCKS_BATCH_SIZE = 1000;
   private final FsDatasetSpi<?> dataset;
   private final FsDatasetSpi<?> dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
   private final ScheduledExecutorService masterThread;
@@ -299,7 +299,9 @@ public class DirectoryScanner implements Runnable {
    * Clear the current cache of diffs and statistics.
    * Clear the current cache of diffs and statistics.
    */
    */
   private void clear() {
   private void clear() {
-    diffs.clear();
+    synchronized (diffs) {
+      diffs.clear();
+    }
     stats.clear();
     stats.clear();
   }
   }
 
 
@@ -372,13 +374,25 @@ public class DirectoryScanner implements Runnable {
    */
    */
   @VisibleForTesting
   @VisibleForTesting
   public void reconcile() throws IOException {
   public void reconcile() throws IOException {
+    LOG.debug("reconcile start DirectoryScanning");
     scan();
     scan();
-    for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
-      String bpid = entry.getKey();
-      LinkedList<ScanInfo> diff = entry.getValue();
-      
-      for (ScanInfo info : diff) {
-        dataset.checkAndUpdate(bpid, info);
+    int loopCount = 0;
+    synchronized (diffs) {
+      for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
+        String bpid = entry.getKey();
+        LinkedList<ScanInfo> diff = entry.getValue();
+
+        for (ScanInfo info : diff) {
+          dataset.checkAndUpdate(bpid, info);
+          if (loopCount % RECONCILE_BLOCKS_BATCH_SIZE == 0) {
+            try {
+              Thread.sleep(2000);
+            } catch (InterruptedException e) {
+              // do nothing
+            }
+          }
+          loopCount++;
+        }
       }
       }
     }
     }
     if (!retainDiffs) clear();
     if (!retainDiffs) clear();
@@ -401,7 +415,9 @@ public class DirectoryScanner implements Runnable {
         Stats statsRecord = new Stats(bpid);
         Stats statsRecord = new Stats(bpid);
         stats.put(bpid, statsRecord);
         stats.put(bpid, statsRecord);
         LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
         LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
-        diffs.put(bpid, diffRecord);
+        synchronized(diffs) {
+          diffs.put(bpid, diffRecord);
+        }
         
         
         statsRecord.totalBlocks = blockpoolReport.length;
         statsRecord.totalBlocks = blockpoolReport.length;
         final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid);
         final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid);