Explorar o código

HDFS-14476. lock too long when fix inconsistent blocks between disk and in-memory. Contributed by Sean Chow.

Masatake Iwasaki %!s(int64=5) %!d(string=hai) anos
pai
achega
01ed6832f4

+ 30 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.util.Time;
 public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
   private static final int MILLIS_PER_SECOND = 1000;
+  private static final int RECONCILE_BLOCKS_BATCH_SIZE = 1000;
   private static final String START_MESSAGE =
       "Periodic Directory Tree Verification scan"
       + " starting at %s with interval of %dms";
@@ -491,7 +492,9 @@ public class DirectoryScanner implements Runnable {
    * Clear the current cache of diffs and statistics.
    */
   private void clear() {
-    diffs.clear();
+    synchronized (diffs) {
+      diffs.clear();
+    }
     stats.clear();
   }
 
@@ -564,14 +567,30 @@ public class DirectoryScanner implements Runnable {
    */
   @VisibleForTesting
   void reconcile() throws IOException {
+    LOG.debug("reconcile start DirectoryScanning");
     scan();
-    for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
-      String bpid = entry.getKey();
-      LinkedList<ScanInfo> diff = entry.getValue();
-      
-      for (ScanInfo info : diff) {
-        dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
-            info.getMetaFile(), info.getVolume());
+
+    // HDFS-14476: run checkAndUpadte with batch to avoid holding the lock
+    // too long
+    int loopCount = 0;
+    synchronized (diffs) {
+      for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
+        String bpid = entry.getKey();
+        LinkedList<ScanInfo> diff = entry.getValue();
+
+        for (ScanInfo info : diff) {
+          dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
+              info.getMetaFile(), info.getVolume());
+
+          if (loopCount % RECONCILE_BLOCKS_BATCH_SIZE == 0) {
+              try {
+                Thread.sleep(2000);
+              } catch (InterruptedException e) {
+                // do nothing
+              }
+          }
+          loopCount++;
+        }
       }
     }
     if (!retainDiffs) clear();
@@ -594,7 +613,9 @@ public class DirectoryScanner implements Runnable {
         Stats statsRecord = new Stats(bpid);
         stats.put(bpid, statsRecord);
         LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
-        diffs.put(bpid, diffRecord);
+        synchronized (diffs) {
+          diffs.put(bpid, diffRecord);
+        }
         
         statsRecord.totalBlocks = blockpoolReport.length;
         final List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);