소스 검색

HDFS-2379. Merging change r1196456 from branch 1

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1240440 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 13 년 전
부모
커밋
a03295478e

+ 3 - 0
CHANGES.txt

@@ -18,6 +18,9 @@ Release 1.0.1 - 2012.01.30
 
     HADOOP-7470. Move up to Jackson 1.8.8.  (Enis Soztutar via szetszwo)
 
+    HDFS-2379. Allow block reports to proceed without holding FSDataset lock.
+    (todd via suresh)
+
   BUG FIXES
 
     HADOOP-7960. Port HADOOP-5203 to branch-1, build version comparison is too 

+ 64 - 35
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -248,7 +248,16 @@ public class DataNode extends Configured
   public final static String DATA_DIR_PERMISSION_KEY = 
     "dfs.datanode.data.dir.perm";
   private static final String DEFAULT_DATA_DIR_PERMISSION = "755";
-  
+
+  // Thresholds for when we start to log when a block report is
+  // taking a long time to generate. Under heavy disk load and
+  // memory pressure, it's normal for block reports to take
+  // several minutes, since they cause many disk seeks.
+  private static final long LATE_BLOCK_REPORT_WARN_THRESHOLD =
+      10 * 60 * 1000; // 10m
+  private static final long LATE_BLOCK_REPORT_INFO_THRESHOLD =
+      3 * 60 * 1000; // 3m
+
   // For InterDataNodeProtocol
   public Server ipcServer;
 
@@ -713,6 +722,8 @@ public class DataNode extends Configured
       namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
     }
     // random short delay - helps scatter the BR from all DNs
+    // - but we can start generating the block report immediately
+    data.requestAsyncBlockReport();
     scheduleBlockReport(initialBlockReportDelay);
   }
 
@@ -937,42 +948,60 @@ public class DataNode extends Configured
 
         // Send latest blockinfo report if timer has expired.
         if (startTime - lastBlockReport > blockReportInterval) {
-          
-          // Create block report
-          long brCreateStartTime = now();
-          Block[] bReport = data.getBlockReport();
-          
-          // Send block report
-          long brSendStartTime = now();
-          DatanodeCommand cmd = namenode.blockReport(dnRegistration,
-                  BlockListAsLongs.convertToArrayLongs(bReport));
-          
-          // Log the block report processing stats from Datanode perspective
-          long brSendCost = now() - brSendStartTime;
-          long brCreateCost = brSendStartTime - brCreateStartTime;
-          myMetrics.addBlockReport(brSendCost);
-          LOG.info("BlockReport of " + bReport.length
-              + " blocks took " + brCreateCost + " msec to generate and "
-              + brSendCost + " msecs for RPC and NN processing");
-
-          //
-          // If we have sent the first block report, then wait a random
-          // time before we start the periodic block reports.
-          //
-          if (resetBlockReportTime) {
-            lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
-            resetBlockReportTime = false;
+          if (data.isAsyncBlockReportReady()) {
+            // Create block report
+            long brCreateStartTime = now();
+            Block[] bReport = data.retrieveAsyncBlockReport();
+            
+            // Send block report
+            long brSendStartTime = now();
+            DatanodeCommand cmd = namenode.blockReport(dnRegistration,
+                    BlockListAsLongs.convertToArrayLongs(bReport));
+            
+            // Log the block report processing stats from Datanode perspective
+            long brSendCost = now() - brSendStartTime;
+            long brCreateCost = brSendStartTime - brCreateStartTime;
+            myMetrics.addBlockReport(brSendCost);
+            LOG.info("BlockReport of " + bReport.length
+                + " blocks took " + brCreateCost + " msec to generate and "
+                + brSendCost + " msecs for RPC and NN processing");
+
+            //
+            // If we have sent the first block report, then wait a random
+            // time before we start the periodic block reports.
+            //
+            if (resetBlockReportTime) {
+              lastBlockReport = startTime -
+                  R.nextInt((int)(blockReportInterval));
+              resetBlockReportTime = false;
+            } else {
+              /* say the last block report was at 8:20:14. The current report 
+               * should have started around 9:20:14 (default 1 hour interval). 
+               * If current time is :
+               *   1) normal like 9:20:18, next report should be at 10:20:14
+               *   2) unexpected like 11:35:43, next report should be at
+               *      12:20:14
+               */
+              lastBlockReport += (now() - lastBlockReport) / 
+                                 blockReportInterval * blockReportInterval;
+            }
+            processCommand(cmd);
           } else {
-            /* say the last block report was at 8:20:14. The current report 
-             * should have started around 9:20:14 (default 1 hour interval). 
-             * If current time is :
-             *   1) normal like 9:20:18, next report should be at 10:20:14
-             *   2) unexpected like 11:35:43, next report should be at 12:20:14
-             */
-            lastBlockReport += (now() - lastBlockReport) / 
-                               blockReportInterval * blockReportInterval;
+            data.requestAsyncBlockReport();
+            if (lastBlockReport > 0) { // this isn't the first report
+              long waitingFor =
+                  startTime - lastBlockReport - blockReportInterval;
+              String msg = "Block report is due, and been waiting for it for " +
+                  (waitingFor/1000) + " seconds...";
+              if (waitingFor > LATE_BLOCK_REPORT_WARN_THRESHOLD) {
+                LOG.warn(msg);
+              } else if (waitingFor > LATE_BLOCK_REPORT_INFO_THRESHOLD) {
+                LOG.info(msg);
+              } else if (LOG.isDebugEnabled()) {
+                LOG.debug(msg);
+              }
+            }
           }
-          processCommand(cmd);
         }
 
         // start block scanner

+ 273 - 49
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeSet;
 
 import javax.management.NotCompliantMBeanException;
@@ -66,22 +67,23 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   /** Find the metadata file for the specified block file.
    * Return the generation stamp from the name of the metafile.
    */
-  private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
-    String blockName = blockFile.getName();
+  static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+    String blockNamePrefix = blockFile.getName() + "_";
+    // blockNamePrefix is blk_12345_
+    // path we're looking for looks like = blk_12345_GENSTAMP.meta
+
     for (int j = 0; j < listdir.length; j++) {
       String path = listdir[j].getName();
-      if (!path.startsWith(blockName)) {
-        continue;
-      }
-      String[] vals = path.split("_");
-      if (vals.length != 3) {     // blk, blkid, genstamp.meta
+      if (!path.startsWith(blockNamePrefix)) {
         continue;
       }
-      String[] str = vals[2].split("\\.");
-      if (str.length != 2) {
+      if (!path.endsWith(".meta")) {
         continue;
       }
-      return Long.parseLong(str[0]);
+      
+      String metaPart = path.substring(blockNamePrefix.length(),
+          path.length() - METADATA_EXTENSION_LENGTH);
+      return Long.parseLong(metaPart);
     }
     DataNode.LOG.warn("Block " + blockFile + 
                       " does not have a metafile!");
@@ -212,30 +214,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       return children[ lastChildIdx ].addBlock(b, src, true, false); 
     }
 
-    /**
-     * Populate the given blockSet with any child blocks
-     * found at this node.
-     */
-    public void getBlockInfo(TreeSet<Block> blockSet) {
-      if (children != null) {
-        for (int i = 0; i < children.length; i++) {
-          children[i].getBlockInfo(blockSet);
-        }
-      }
-
-      File blockFiles[] = dir.listFiles();
-      if (blockFiles != null) {
-        for (int i = 0; i < blockFiles.length; i++) {
-          if (Block.isBlockFilename(blockFiles[i])) {
-            long genStamp = FSDataset.getGenerationStampFromFile(blockFiles,
-                blockFiles[i]);
-            blockSet.add(new Block(blockFiles[i], blockFiles[i].length(),
-                genStamp));
-          }
-        }
-      }
-    }
-
     /**
      * Populate the given blockSet with any child blocks
      * found at this node. With each block, return the full path
@@ -525,9 +503,43 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       DiskChecker.checkDir(tmpDir);
       DiskChecker.checkDir(blocksBeingWritten);
     }
-      
-    void getBlockInfo(TreeSet<Block> blockSet) {
-      dataDir.getBlockInfo(blockSet);
+
+    void scanBlockFilesInconsistent(Map<Block, File> results) {
+      scanBlockFilesInconsistent(dataDir.dir, results);
+    }
+
+    /**
+     * Recursively scan the given directory, generating a map where
+     * each key is a discovered block, and the value is the actual
+     * file for that block.
+     *
+     * This is unsynchronized since it can take quite some time
+     * when inodes and dentries have been paged out of cache.
+     * After the scan is completed, we reconcile it with
+     * the current disk state in reconcileRoughBlockScan.
+     */
+    private void scanBlockFilesInconsistent(
+        File dir, Map<Block, File> results) {
+      File filesInDir[] = dir.listFiles();
+      if (filesInDir != null) {
+        for (File f : filesInDir) {
+          if (Block.isBlockFilename(f)) {
+            long blockLen = f.length();
+            if (blockLen == 0 && !f.exists()) {
+              // length 0 could indicate a race where this file was removed
+              // while we were in the middle of generating the report.
+              continue;
+            }
+            long genStamp = FSDataset.getGenerationStampFromFile(filesInDir, f);
+            Block b = new Block(f, blockLen, genStamp);
+            results.put(b, f);
+          } else if (f.getName().startsWith("subdir")) {
+            // the startsWith check is much faster than the
+            // stat() call invoked by isDirectory()
+            scanBlockFilesInconsistent(f, results);
+          }
+        }
+      }
     }
     
     void getBlocksBeingWrittenInfo(TreeSet<Block> blockSet) {
@@ -685,13 +697,20 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       }
       return remaining;
     }
+
+    void scanBlockFilesInconsistent(Map<Block, File> results) {
+      // Make a local consistent copy of the volume list, since
+      // it might change due to a disk failure
+      FSVolume volumesCopy[];
+      synchronized (this) {
+        volumesCopy = Arrays.copyOf(volumes, volumes.length);
+      }
       
-    synchronized void getBlockInfo(TreeSet<Block> blockSet) {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].getBlockInfo(blockSet);
+      for (FSVolume vol : volumesCopy) {
+        vol.scanBlockFilesInconsistent(results);
       }
     }
-      
+    
     synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
@@ -772,6 +791,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
 
   //Find better place?
   public static final String METADATA_EXTENSION = ".meta";
+  public static final int METADATA_EXTENSION_LENGTH =
+    METADATA_EXTENSION.length();
   public static final short METADATA_VERSION = 1;
   
 
@@ -926,6 +947,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   private int validVolsRequired;
   FSDatasetAsyncDiskService asyncDiskService;
 
+  private final AsyncBlockReport asyncBlockReport;
+
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -966,6 +990,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
     volumes = new FSVolumeSet(volArray);
     volumes.getVolumeMap(volumeMap);
+    asyncBlockReport = new AsyncBlockReport(this);
+    asyncBlockReport.start();
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
@@ -1656,18 +1682,125 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return blockTable;
   }
   
+  @Override
+  public void requestAsyncBlockReport() {
+    asyncBlockReport.request();
+  }
+
+  @Override
+  public boolean isAsyncBlockReportReady() {
+    return asyncBlockReport.isReady();
+  }
+  
+  @Override
+  public Block[] retrieveAsyncBlockReport() {
+    HashMap<Block, File> seenOnDisk = asyncBlockReport.getAndReset();
+    return reconcileRoughBlockScan(seenOnDisk);
+  }
+  
   /**
-   * Return a table of block data
+   * Return a table of block data. This method is synchronous, and is used
+   * by tests and during block scanner startup.
    */
   public Block[] getBlockReport() {
-    TreeSet<Block> blockSet = new TreeSet<Block>();
-    volumes.getBlockInfo(blockSet);
-    Block blockTable[] = new Block[blockSet.size()];
-    int i = 0;
-    for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
-      blockTable[i] = it.next();
+    long st = System.currentTimeMillis();
+    HashMap<Block, File> seenOnDisk = roughBlockScan();
+    // the above results are inconsistent since modifications
+    // happened concurrently. Now check any diffs
+    DataNode.LOG.info("Generated rough (lockless) block report in "
+        + (System.currentTimeMillis() - st) + " ms");
+    return reconcileRoughBlockScan(seenOnDisk);
+  }
+
+  private Block[] reconcileRoughBlockScan(HashMap<Block, File> seenOnDisk) {
+    Set<Block> blockReport;
+    synchronized (this) {
+      long st = System.currentTimeMillis();
+      // broken out to a static method to simplify testing
+      reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+      DataNode.LOG.info(
+          "Reconciled asynchronous block report against current state in " +
+          (System.currentTimeMillis() - st) + " ms");
+      
+      blockReport = seenOnDisk.keySet();
+    }
+
+    return blockReport.toArray(new Block[0]);
+  }
+
+  /**
+   * Scan the blocks in the dataset on disk, without holding any
+   * locks. This generates a "rough" block report, since there
+   * may be concurrent modifications to the disk structure.
+   */
+  HashMap<Block, File> roughBlockScan() {
+    int expectedNumBlocks;
+    synchronized (this) {
+      expectedNumBlocks = volumeMap.size();
+    }
+    HashMap<Block, File> seenOnDisk =
+        new HashMap<Block, File>(expectedNumBlocks, 1.1f);
+    volumes.scanBlockFilesInconsistent(seenOnDisk);
+    return seenOnDisk;
+  }
+
+  static void reconcileRoughBlockScan(
+      Map<Block, File> seenOnDisk,
+      Map<Block, DatanodeBlockInfo> volumeMap,
+      Map<Block,ActiveFile> ongoingCreates) {
+    
+    int numDeletedAfterScan = 0;
+    int numAddedAfterScan = 0;
+    int numOngoingIgnored = 0;
+    
+    // remove anything seen on disk that's no longer in the memory map,
+    // or got reopened while we were scanning
+    Iterator<Map.Entry<Block, File>> iter = seenOnDisk.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Block, File> entry = iter.next();
+      Block b = entry.getKey();
+
+      if (!volumeMap.containsKey(b) || ongoingCreates.containsKey(b)) {
+        File blockFile = entry.getValue();
+        File metaFile = getMetaFile(blockFile, b);
+        if (!blockFile.exists() || !metaFile.exists()) {
+          // the block was deleted (or had its generation stamp changed)
+          // after it was scanned on disk... If the genstamp changed,
+          // it will be added below when we scan volumeMap
+          iter.remove();
+          numDeletedAfterScan++;
+        }
+      }
+    }
+
+    // add anything from the in-memory map that wasn't seen on disk,
+    // if and only if the file is now verifiably on disk.
+    for (Map.Entry<Block, DatanodeBlockInfo> entry : volumeMap.entrySet()) {
+      Block b = entry.getKey();
+      if (ongoingCreates.containsKey(b)) {
+        // don't add these to block reports
+        numOngoingIgnored++;
+        continue;
+      }
+      DatanodeBlockInfo info = entry.getValue();
+      if (!seenOnDisk.containsKey(b) && info.getFile().exists()) {
+        // add a copy, and use the length from disk instead of from memory
+        Block toAdd =  new Block(
+            b.getBlockId(), info.getFile().length(), b.getGenerationStamp());
+        seenOnDisk.put(toAdd, info.getFile());
+        numAddedAfterScan++;
+      }
+      // if the file is in memory but _not_ on disk, this is the situation
+      // in which an administrator accidentally "rm -rf"ed part of a data
+      // directory. We should _not_ report these blocks.
+    }
+    
+    if (numDeletedAfterScan + numAddedAfterScan + numOngoingIgnored > 0) {
+      DataNode.LOG.info("Reconciled asynchronous block scan with filesystem. " +
+          numDeletedAfterScan + " blocks concurrently deleted during scan, " +
+          numAddedAfterScan + " blocks concurrently added during scan, " +
+          numOngoingIgnored + " ongoing creations ignored");
     }
-    return blockTable;
   }
 
   /**
@@ -1937,6 +2070,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       asyncDiskService.shutdown();
     }
 
+    if (asyncBlockReport != null) {
+      asyncBlockReport.shutdown();
+    }
+    
     if(volumes != null) {
       for (FSVolume volume : volumes.volumes) {
         if(volume != null) {
@@ -2031,4 +2168,91 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       return info;
     }
   }
+  
+  /**
+   * Thread which handles generating "rough" block reports in the background.
+   * Callers should call request(), and then poll isReady() while the
+   * work happens. When isReady() returns true, getAndReset() may be
+   * called to retrieve the results.
+   */
+  static class AsyncBlockReport implements Runnable {
+    private final Thread thread;
+    private final FSDataset fsd;
+
+    boolean requested = false;
+    boolean shouldRun = true;
+    private HashMap<Block, File> scan = null;
+    
+    AsyncBlockReport(FSDataset fsd) {
+      this.fsd = fsd;
+      thread = new Thread(this, "Async Block Report Generator");
+      thread.setDaemon(true);
+    }
+    
+    void start() {
+      thread.start();
+    }
+    
+    synchronized void shutdown() {
+      shouldRun = false;
+      thread.interrupt();
+    }
+
+    synchronized boolean isReady() {
+      return scan != null;
+    }
+
+    synchronized HashMap<Block, File> getAndReset() {
+      if (!isReady()) {
+        throw new IllegalStateException("report not ready!");
+      }
+      HashMap<Block, File> ret = scan;
+      scan = null;
+      requested = false;
+      return ret;
+    }
+
+    synchronized void request() {
+      requested = true;
+      notifyAll();
+    }
+
+    @Override
+    public void run() {
+      while (shouldRun) {
+        try {
+          waitForReportRequest();
+          assert requested && scan == null;
+          
+          DataNode.LOG.info("Starting asynchronous block report scan");
+          long st = System.currentTimeMillis();
+          HashMap<Block, File> result = fsd.roughBlockScan();
+          DataNode.LOG.info("Finished asynchronous block report scan in "
+              + (System.currentTimeMillis() - st) + "ms");
+          
+          synchronized (this) {
+            assert scan == null;
+            this.scan = result;
+          }
+        } catch (InterruptedException ie) {
+          // interrupted to end scanner
+        } catch (Throwable t) {
+          DataNode.LOG.error("Async Block Report thread caught exception", t);
+          try {
+            // Avoid busy-looping in the case that we have entered some invalid
+            // state -- don't want to flood the error log with exceptions.
+            Thread.sleep(2000);
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    }
+
+    private synchronized void waitForReportRequest()
+        throws InterruptedException {
+      while (!(requested && scan == null)) {
+        wait(5000);
+      }
+    }
+  }
 }

+ 23 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -237,6 +237,29 @@ public interface FSDatasetInterface extends FSDatasetMBean {
    */
   public Block[] getBlockReport();
   
+  /**
+   * Request that a block report be prepared.
+   */
+  public void requestAsyncBlockReport();
+
+  /**
+   * @return true if an asynchronous block report is ready
+   */
+  public boolean isAsyncBlockReportReady();
+
+  /**
+   * Retrieve an asynchronously prepared block report. Callers should first
+   * call {@link #requestAsyncBlockReport()}, and then poll
+   * {@link #isAsyncBlockReportReady()} until it returns true.
+   *
+   * Retrieving the asynchronous block report also resets it; a new
+   * one must be prepared before this method may be called again.
+   *
+   * @throws IllegalStateException if an async report is not ready
+   */
+  public Block[] retrieveAsyncBlockReport();
+
+  
   /**
    * Returns the blocks being written report 
    * @return - the blocks being written report

+ 15 - 0
src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -308,6 +308,21 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     }
     return blockTable;
   }
+  
+  @Override
+  public void requestAsyncBlockReport() {
+  }
+
+  @Override
+  public boolean isAsyncBlockReportReady() {
+    return true;
+  }
+
+  @Override
+  public Block[] retrieveAsyncBlockReport() {
+    return getBlockReport();
+  }
+
 
   public long getCapacity() throws IOException {
     return storage.getCapacity();

+ 270 - 0
src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java

@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.ActiveFile;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.AsyncBlockReport;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestBlockReportGeneration {
+  private static final long BLKID = 12345L;
+  private static final long GENSTAMP = 1000L;
+  private static final long LEN = 65536L;
+  
+  private static final Block FAKE_BLK =
+    new Block(BLKID, LEN, GENSTAMP);
+
+  
+  static final File TEST_DIR = new File(
+      System.getProperty("test.build.data") + File.pathSeparatorChar
+      + "TestBlockReportGeneration");
+  
+  Map<Block, File> seenOnDisk = new HashMap<Block, File>();
+  Map<Block, DatanodeBlockInfo> volumeMap =
+      new HashMap<Block, DatanodeBlockInfo>();
+  Map<Block, ActiveFile> ongoingCreates = new HashMap<Block, ActiveFile>();;
+
+  
+  @Before
+  public void cleanupTestDir() throws IOException {
+    FileUtil.fullyDelete(TEST_DIR);
+    assertTrue(TEST_DIR.mkdirs());
+  }
+  
+  @Test
+  public void testEmpty() {
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    assertTrue(seenOnDisk.isEmpty());
+  }
+  
+  /**
+   * Test for case where for some reason there's a block on disk
+   * that got lost in volumemap - we want to report this.
+   */
+  @Test
+  public void testOnDiskButNotMemory() {
+    fakeSeenByScan(FAKE_BLK);
+    fakeBlockOnDisk(FAKE_BLK);
+    
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // should still be in map, since it was seen to still exist on disk
+    // (exists returns true)
+    assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+  }
+  
+  /**
+   * Test for case where we lost a block from disk - eg a user rm -Rfed
+   * a data dir accidentally.
+   */
+  @Test
+  public void testInMemoryButNotOnDisk() {
+    fakeInVolumeMap(FAKE_BLK);
+    
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+    assertTrue(volumeMap.containsKey(FAKE_BLK));
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // should not be added to the map, since it's truly not on disk
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+  }
+  
+  /**
+   * Test for case where we concurrently removed a block, so it is
+   * seen in scan, but during reconciliation is on longer on disk.
+   */
+  @Test
+  public void testRemovedAfterScan() {
+    fakeSeenByScan(FAKE_BLK);
+    
+    assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+    assertFalse(volumeMap.containsKey(FAKE_BLK));
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // should be removed from the map since .exists() returns false
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+  }
+    
+  /**
+   * Test for case where we concurrently added a block, so it is
+   * not seen in scan, but is in volumeMap and on disk during
+   * reconciliation.
+   */
+  @Test
+  public void testAddedAfterScan() {
+    fakeInVolumeMap(FAKE_BLK);
+    fakeBlockOnDisk(FAKE_BLK);
+    
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+    assertTrue(volumeMap.containsKey(FAKE_BLK));
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // should be added, since it's found on disk when reconciling
+    assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+  }
+  
+  /**
+   * Test for case where we concurrently changed the generation stamp
+   * of a block. So, we scanned it with one GS, but at reconciliation
+   * time it has a new GS.
+   */
+  @Test
+  public void testGenstampChangedAfterScan() {
+    Block oldGenStamp = FAKE_BLK;
+    Block newGenStamp = new Block(FAKE_BLK);
+    newGenStamp.setGenerationStamp(GENSTAMP + 1);
+
+    fakeSeenByScan(oldGenStamp);
+    fakeInVolumeMap(newGenStamp);
+    fakeBlockOnDisk(newGenStamp);
+    
+    assertTrue(seenOnDisk.containsKey(oldGenStamp));
+
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // old genstamp should not be added
+    assertFalse(seenOnDisk.containsKey(oldGenStamp));
+    // new genstamp should be added, since it's found on disk when reconciling
+    assertTrue(seenOnDisk.containsKey(newGenStamp));
+  }
+  
+  @Test
+  public void testGetGenerationStampFromFile() {
+    File[] fileList = new File[] {
+        // include some junk files which should be ignored
+        new File("blk_-1362850638739812068_5351.meta.foo"),
+        new File("blk_-1362850638739812068_5351meta"),
+        // the real dir
+        new File("."),
+        new File(".."),
+        new File("blk_-1362850638739812068"),
+        new File("blk_-1362850638739812068_5351.meta"),
+        new File("blk_1453973893701037484"),
+        new File("blk_1453973893701037484_4804.meta"),
+    };
+    
+    assertEquals(4804, FSDataset.getGenerationStampFromFile(fileList,
+        new File("blk_1453973893701037484")));
+    // try a prefix of a good block ID
+    assertEquals(Block.GRANDFATHER_GENERATION_STAMP,
+        FSDataset.getGenerationStampFromFile(fileList,
+            new File("blk_145397389370103")));
+
+    assertEquals(Block.GRANDFATHER_GENERATION_STAMP,
+        FSDataset.getGenerationStampFromFile(fileList,
+            new File("blk_99999")));
+    
+    // pass nonsense value
+    assertEquals(Block.GRANDFATHER_GENERATION_STAMP,
+        FSDataset.getGenerationStampFromFile(fileList,
+            new File("blk_")));
+  }
+
+  
+  /**
+   * Test case for blocks being created - these are not seen by the
+   * scan since they're in the current/ dir, not bbw/ -- but
+   * they are in volumeMap and ongoingCreates. These should not
+   * be reported.
+   */
+  @Test
+  public void testFileBeingCreated() {
+    fakeInVolumeMap(FAKE_BLK);
+    fakeBlockOnDisk(FAKE_BLK);
+    fakeBeingCreated(FAKE_BLK);
+    
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+    assertTrue(volumeMap.containsKey(FAKE_BLK));
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // should not be added, since it's in the midst of being created!
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+  }
+  
+  /**
+   * Test for case where we reopened a block during the scan
+   */
+  @Test
+  public void testReopenedDuringScan() {
+    fakeSeenByScan(FAKE_BLK);
+    fakeInVolumeMap(FAKE_BLK);
+    fakeBeingCreated(FAKE_BLK);
+    
+    assertTrue(seenOnDisk.containsKey(FAKE_BLK));
+    assertTrue(volumeMap.containsKey(FAKE_BLK));
+    FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
+    // should be removed from the map since .exists() returns false
+    assertFalse(seenOnDisk.containsKey(FAKE_BLK));
+  }
+
+  @Test(timeout=20000)
+  public void testAsyncReport() throws Exception {
+    FSDataset mock = Mockito.mock(FSDataset.class);
+    AsyncBlockReport abr = new FSDataset.AsyncBlockReport(mock);
+    abr.start();
+    try {
+      for (int i = 0; i < 3; i++) {
+        HashMap<Block, File> mockResult = new HashMap<Block, File>();
+        Mockito.doReturn(mockResult).when(mock).roughBlockScan();
+        
+        assertFalse(abr.isReady());
+        abr.request();
+        while (!abr.isReady()) {
+          Thread.sleep(10);
+        }
+        assertSame(mockResult, abr.getAndReset());
+        assertFalse(abr.isReady());
+      }      
+    } finally {
+      abr.shutdown();
+    }
+  }
+
+  private void fakeBeingCreated(Block b) {
+    ongoingCreates.put(b,
+        new ActiveFile(blockFile(b), new ArrayList<Thread>()));
+  }
+
+  private void fakeInVolumeMap(Block b) {
+    volumeMap.put(b, new DatanodeBlockInfo(null, blockFile(b)));
+  }
+
+  private void fakeBlockOnDisk(Block b) {
+    File f = blockFile(b);
+    try {
+      f.createNewFile();
+      FSDataset.getMetaFile(f, b).createNewFile();
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create: " + f);
+    }
+  }
+  
+  private void fakeSeenByScan(Block b) {
+    seenOnDisk.put(b, blockFile(b));
+  }
+
+  private File blockFile(Block b) {
+    return new File(TEST_DIR, b.getBlockName());
+  }
+}