浏览代码

HDFS-11182. Update DataNode to use DatasetVolumeChecker.

Arpit Agarwal 8 年之前
父节点
当前提交
ec80de3ccc
共有 19 个文件被更改,包括 478 次插入529 次删除
  1. 155 156
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  2. 0 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
  3. 63 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
  4. 25 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  5. 53 53
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  6. 0 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  7. 12 31
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
  8. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
  9. 91 84
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
  11. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  12. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
  13. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java
  14. 17 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
  15. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java
  16. 24 19
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java
  17. 3 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  18. 13 73
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
  19. 0 32
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java

文件差异内容过多而无法显示
+ 155 - 156
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java


+ 0 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java

@@ -30,11 +30,6 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.util.DiskChecker;

+ 63 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java

@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -164,9 +163,21 @@ public class DatasetVolumeChecker {
   public Set<FsVolumeSpi> checkAllVolumes(
       final FsDatasetSpi<? extends FsVolumeSpi> dataset)
       throws InterruptedException {
-
-    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
       numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+          "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
+      return Collections.emptySet();
+    }
+
+    final FsDatasetSpi.FsVolumeReferences references =
+        dataset.getFsVolumeReferences();
+
+    if (references.size() == 0) {
+      LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
       return Collections.emptySet();
     }
 
@@ -175,9 +186,8 @@ public class DatasetVolumeChecker {
     final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
     final Set<FsVolumeSpi> allVolumes = new HashSet<>();
 
-    final FsDatasetSpi.FsVolumeReferences references =
-        dataset.getFsVolumeReferences();
-    final CountDownLatch resultsLatch = new CountDownLatch(references.size());
+    final AtomicLong numVolumes = new AtomicLong(references.size());
+    final CountDownLatch latch = new CountDownLatch(1);
 
     for (int i = 0; i < references.size(); ++i) {
       final FsVolumeReference reference = references.getReference(i);
@@ -186,12 +196,18 @@ public class DatasetVolumeChecker {
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       LOG.info("Scheduled health check for volume {}", reference.getVolume());
       Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, resultsLatch, null));
+          reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
+        @Override
+        public void call(Set<FsVolumeSpi> ignored1,
+                         Set<FsVolumeSpi> ignored2) {
+          latch.countDown();
+        }
+      }));
     }
 
     // Wait until our timeout elapses, after which we give up on
     // the remaining volumes.
-    if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
+    if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
       LOG.warn("checkAllVolumes timed out after {} ms" +
           maxAllowedTimeForCheckMs);
     }
@@ -225,18 +241,28 @@ public class DatasetVolumeChecker {
   public boolean checkAllVolumesAsync(
       final FsDatasetSpi<? extends FsVolumeSpi> dataset,
       Callback callback) {
-
-    if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
+    final long gap = timer.monotonicNow() - lastAllVolumesCheck;
+    if (gap < minDiskCheckGapMs) {
       numSkippedChecks.incrementAndGet();
+      LOG.trace(
+          "Skipped checking all volumes, time since last check {} is less " +
+              "than the minimum gap between checks ({} ms).",
+          gap, minDiskCheckGapMs);
+      return false;
+    }
+
+    final FsDatasetSpi.FsVolumeReferences references =
+        dataset.getFsVolumeReferences();
+
+    if (references.size() == 0) {
+      LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
       return false;
     }
 
     lastAllVolumesCheck = timer.monotonicNow();
     final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
     final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
-    final FsDatasetSpi.FsVolumeReferences references =
-        dataset.getFsVolumeReferences();
-    final CountDownLatch latch = new CountDownLatch(references.size());
+    final AtomicLong numVolumes = new AtomicLong(references.size());
 
     LOG.info("Checking {} volumes", references.size());
     for (int i = 0; i < references.size(); ++i) {
@@ -245,7 +271,7 @@ public class DatasetVolumeChecker {
       ListenableFuture<VolumeCheckResult> future =
           delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
       Futures.addCallback(future, new ResultHandler(
-          reference, healthyVolumes, failedVolumes, latch, callback));
+          reference, healthyVolumes, failedVolumes, numVolumes, callback));
     }
     numAsyncDatasetChecks.incrementAndGet();
     return true;
@@ -273,8 +299,10 @@ public class DatasetVolumeChecker {
    *
    * @param volume the volume that is to be checked.
    * @param callback callback to be invoked when the volume check completes.
+   * @return true if the check was scheduled and the callback will be invoked.
+   *         false otherwise.
    */
-  public void checkVolume(
+  public boolean checkVolume(
       final FsVolumeSpi volume,
       Callback callback) {
     FsVolumeReference volumeReference;
@@ -283,14 +311,15 @@ public class DatasetVolumeChecker {
     } catch (ClosedChannelException e) {
       // The volume has already been closed.
       callback.call(new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>());
-      return;
+      return false;
     }
     ListenableFuture<VolumeCheckResult> future =
         delegateChecker.schedule(volume, IGNORED_CONTEXT);
     numVolumeChecks.incrementAndGet();
     Futures.addCallback(future, new ResultHandler(
-        volumeReference, new HashSet<FsVolumeSpi>(),
-        new HashSet<FsVolumeSpi>(), new CountDownLatch(1), callback));
+        volumeReference, new HashSet<FsVolumeSpi>(), new HashSet<FsVolumeSpi>(),
+        new AtomicLong(1), callback));
+    return true;
   }
 
   /**
@@ -301,24 +330,33 @@ public class DatasetVolumeChecker {
     private final FsVolumeReference reference;
     private final Set<FsVolumeSpi> failedVolumes;
     private final Set<FsVolumeSpi> healthyVolumes;
-    private final CountDownLatch latch;
-    private final AtomicLong numVolumes;
+    private final AtomicLong volumeCounter;
 
     @Nullable
     private final Callback callback;
 
+    /**
+     *
+     * @param reference FsVolumeReference to be released when the check is
+     *                  complete.
+     * @param healthyVolumes set of healthy volumes. If the disk check is
+     *                       successful, add the volume here.
+     * @param failedVolumes set of failed volumes. If the disk check fails,
+     *                      add the volume here.
+     * @param semaphore semaphore used to trigger callback invocation.
+     * @param callback invoked when the semaphore can be successfully acquired.
+     */
     ResultHandler(FsVolumeReference reference,
                   Set<FsVolumeSpi> healthyVolumes,
                   Set<FsVolumeSpi> failedVolumes,
-                  CountDownLatch latch,
+                  AtomicLong volumeCounter,
                   @Nullable Callback callback) {
       Preconditions.checkState(reference != null);
       this.reference = reference;
       this.healthyVolumes = healthyVolumes;
       this.failedVolumes = failedVolumes;
-      this.latch = latch;
+      this.volumeCounter = volumeCounter;
       this.callback = callback;
-      numVolumes = new AtomicLong(latch.getCount());
     }
 
     @Override
@@ -372,10 +410,8 @@ public class DatasetVolumeChecker {
 
     private void invokeCallback() {
       try {
-        latch.countDown();
-
-        if (numVolumes.decrementAndGet() == 0 &&
-            callback != null) {
+        final long remaining = volumeCounter.decrementAndGet();
+        if (callback != null && remaining == 0) {
           callback.call(healthyVolumes, failedVolumes);
         }
       } catch(Exception e) {

+ 25 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -65,7 +65,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 /**
  * This is a service provider interface for the underlying storage that
  * stores replicas for a data node.
- * The default implementation stores replicas on local drives. 
+ * The default implementation stores replicas on local drives.
  */
 @InterfaceAudience.Private
 public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
@@ -273,7 +273,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   long getLength(ExtendedBlock b) throws IOException;
 
   /**
-   * Get reference to the replica meta info in the replicasMap. 
+   * Get reference to the replica meta info in the replicasMap.
    * To be called from methods that are synchronized on {@link FSDataset}
    * @return replica from the replicas map
    */
@@ -314,7 +314,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   /**
    * Creates a temporary replica and returns the meta information of the replica
    * .
-   * 
+   *
    * @param b block
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
@@ -324,7 +324,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Creates a RBW replica and returns the meta info of the replica
-   * 
+   *
    * @param b block
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
@@ -334,7 +334,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica.
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param minBytesRcvd the minimum number of bytes that the replica could have
@@ -355,7 +355,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Append to a finalized replica and returns the meta info of the replica.
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to have
@@ -368,7 +368,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   /**
    * Recover a failed append to a finalized replica and returns the meta
    * info of the replica.
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to have
@@ -377,11 +377,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   ReplicaHandler recoverAppend(
       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
-  
+
   /**
    * Recover a failed pipeline close.
    * It bumps the replica's generation stamp and finalize it if RBW replica
-   * 
+   *
    * @param b block
    * @param newGS the new generation stamp for the replica
    * @param expectedBlockLen the number of bytes the replica is expected to have
@@ -390,7 +390,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
       ) throws IOException;
-  
+
   /**
    * Finalizes the block previously opened for writing using writeToBlock.
    * The block size is what is in the parameter b and it must match the amount
@@ -437,19 +437,19 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    *
    * @throws ReplicaNotFoundException          If the replica is not found
    *
-   * @throws UnexpectedReplicaStateException   If the replica is not in the 
+   * @throws UnexpectedReplicaStateException   If the replica is not in the
    *                                             expected state.
-   * @throws FileNotFoundException             If the block file is not found or there 
+   * @throws FileNotFoundException             If the block file is not found or there
    *                                              was an error locating it.
    * @throws EOFException                      If the replica length is too short.
-   * 
-   * @throws IOException                       May be thrown from the methods called. 
+   *
+   * @throws IOException                       May be thrown from the methods called.
    */
   void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
       throws ReplicaNotFoundException, UnexpectedReplicaStateException,
       FileNotFoundException, EOFException, IOException;
-      
-  
+
+
   /**
    * Is the block valid?
    * @return - true if the specified block is valid
@@ -493,10 +493,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   boolean isCached(String bpid, long blockId);
 
     /**
-     * Check if all the data directories are healthy
-     * @return A set of unhealthy data directories.
+     * Handle volume failures by removing the failed volumes.
      */
-  Set<File> checkDataDir();
+  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes);
 
   /**
    * Shutdown the FSDataset
@@ -516,7 +515,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Checks how many valid storage volumes there are in the DataNode.
-   * @return true if more than the minimum number of valid volumes are left 
+   * @return true if more than the minimum number of valid volumes are left
    * in the FSDataSet.
    */
   boolean hasEnoughResource();
@@ -528,7 +527,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Initialize a replica recovery.
-   * @return actual state of the replica on this data-node or 
+   * @return actual state of the replica on this data-node or
    * null if data-node does not have the replica.
    */
   ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
@@ -555,13 +554,13 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   void shutdownBlockPool(String bpid) ;
 
   /**
-   * Deletes the block pool directories. If force is false, directories are 
-   * deleted only if no block files exist for the block pool. If force 
+   * Deletes the block pool directories. If force is false, directories are
+   * deleted only if no block files exist for the block pool. If force
    * is true entire directory for the blockpool is deleted along with its
    * contents.
    * @param bpid BlockPool Id to be deleted.
    * @param force If force is false, directories are deleted only if no
-   *        block files exist for the block pool, otherwise entire 
+   *        block files exist for the block pool, otherwise entire
    *        directory for the blockpool is deleted along with its contents.
    * @throws IOException
    */
@@ -574,9 +573,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       ) throws IOException;
 
   /**
-   * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in 
+   * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in
    * <code>blocks</code>.
-   * 
+   *
    * @param bpid pool to query
    * @param blockIds List of block ids for which to return metadata
    * @return metadata Metadata for the list of blocks

+ 53 - 53
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -231,7 +231,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
     return null;
   }
-  
+
   @Override // FsDatasetSpi
   public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
@@ -291,7 +291,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @VisibleForTesting
   final AutoCloseableLock datasetLock;
   private final Condition datasetLockCondition;
-  
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -324,8 +324,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     if (volsFailed > volFailuresTolerated) {
       throw new DiskErrorException("Too many failed volumes - "
-          + "current valid volumes: " + storage.getNumStorageDirs() 
-          + ", volumes configured: " + volsConfigured 
+          + "current valid volumes: " + storage.getNumStorageDirs()
+          + ", volumes configured: " + volsConfigured
           + ", volumes failed: " + volsFailed
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
@@ -614,9 +614,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       return volumes.getBlockPoolUsed(bpid);
     }
   }
-  
+
   /**
-   * Return true - if there are still valid volumes on the DataNode. 
+   * Return true - if there are still valid volumes on the DataNode.
    */
   @Override // FsDatasetSpi
   public boolean hasEnoughResource() {
@@ -762,7 +762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private File getBlockFile(ExtendedBlock b) throws IOException {
     return getBlockFile(b.getBlockPoolId(), b.getBlockId());
   }
-  
+
   /**
    * Get File name for a given block.
    */
@@ -824,7 +824,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * block pool Id, block Id and generation stamp must match.
    * @param b extended block
    * @return the meta replica information
-   * @throws ReplicaNotFoundException if no entry is in the map or 
+   * @throws ReplicaNotFoundException if no entry is in the map or
    *                        there is a generation stamp mismatch
    */
   ReplicaInfo getReplicaInfo(ExtendedBlock b)
@@ -849,7 +849,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param bpid block pool Id
    * @param blkid block Id
    * @return the meta replica information; null if block was not found
-   * @throws ReplicaNotFoundException if no entry is in the map or 
+   * @throws ReplicaNotFoundException if no entry is in the map or
    *                        there is a generation stamp mismatch
    */
   private ReplicaInfo getReplicaInfo(String bpid, long blkid)
@@ -1190,15 +1190,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /** Append to a finalized replica
-   * Change a finalized replica to be a RBW replica and 
+   * Change a finalized replica to be a RBW replica and
    * bump its generation stamp to be the newGS
-   * 
+   *
    * @param bpid block pool Id
    * @param replicaInfo a finalized replica
    * @param newGS new generation stamp
    * @param estimateBlockLen estimate generation stamp
    * @return a RBW replica
-   * @throws IOException if moving the replica from finalized directory 
+   * @throws IOException if moving the replica from finalized directory
    *         to rbw directory fails
    */
   private ReplicaBeingWritten append(String bpid,
@@ -1287,10 +1287,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
+  private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException, MustStopExistingWriter {
     ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
-    
+
     // check state
     if (replicaInfo.getState() != ReplicaState.FINALIZED &&
         replicaInfo.getState() != ReplicaState.RBW) {
@@ -1304,10 +1304,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         replicaGenerationStamp > newGS) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
-          + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          + ". Expected GS range is [" + b.getGenerationStamp() + ", " +
           newGS + "].");
     }
-    
+
     // stop the previous writer before check a replica's length
     long replicaLen = replicaInfo.getNumBytes();
     if (replicaInfo.getState() == ReplicaState.RBW) {
@@ -1316,22 +1316,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         throw new MustStopExistingWriter(rbw);
       }
       // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
-      if (replicaLen != rbw.getBytesOnDisk() 
+      if (replicaLen != rbw.getBytesOnDisk()
           || replicaLen != rbw.getBytesAcked()) {
-        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
-            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
+        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
+            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
             rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
             ") are not the same.");
       }
     }
-    
+
     // check block length
     if (replicaLen != expectedBlockLen) {
-      throw new IOException("Corrupted replica " + replicaInfo + 
-          " with a length of " + replicaLen + 
+      throw new IOException("Corrupted replica " + replicaInfo +
+          " with a length of " + replicaLen +
           " expected length is " + expectedBlockLen);
     }
-    
+
     return replicaInfo;
   }
 
@@ -1390,7 +1390,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
   }
-  
+
   /**
    * Bump a replica's generation stamp to a new one.
    * Its on-disk meta file name is renamed to be the new one too.
@@ -1559,7 +1559,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       return new ReplicaHandler(rbw, ref);
     }
   }
-  
+
   @Override // FsDatasetSpi
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
@@ -1673,7 +1673,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // Hang too long, just bail out. This is not supposed to happen.
       long writerStopMs = Time.monotonicNow() - startTimeMs;
       if (writerStopMs > writerStopTimeoutMs) {
-        LOG.warn("Unable to stop existing writer for block " + b + " after " 
+        LOG.warn("Unable to stop existing writer for block " + b + " after "
             + writerStopMs + " miniseconds.");
         throw new IOException("Unable to stop existing writer for block " + b
             + " after " + writerStopMs + " miniseconds.");
@@ -1690,7 +1690,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * last checksum will be overwritten.
    */
   @Override // FsDatasetSpi
-  public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, 
+  public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
       int checksumSize) throws IOException {
     FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
     FileChannel channel = file.getChannel();
@@ -1807,7 +1807,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       LOG.warn("No file exists for block: " + b);
       return true;
     }
-    
+
     if (!blockFile.delete()) {
       LOG.warn("Not able to delete the block file: " + blockFile);
       return false;
@@ -1910,21 +1910,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param state       If this is null, it is ignored.  If it is non-null, we
    *                        will check that the replica has this state.
    *
-   * @throws ReplicaNotFoundException          If the replica is not found 
+   * @throws ReplicaNotFoundException          If the replica is not found
    *
-   * @throws UnexpectedReplicaStateException   If the replica is not in the 
+   * @throws UnexpectedReplicaStateException   If the replica is not in the
    *                                             expected state.
    * @throws FileNotFoundException             If the block file is not found or there
    *                                              was an error locating it.
    * @throws EOFException                      If the replica length is too short.
-   * 
-   * @throws IOException                       May be thrown from the methods called. 
+   *
+   * @throws IOException                       May be thrown from the methods called.
    */
   @Override // FsDatasetSpi
   public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
       throws ReplicaNotFoundException, UnexpectedReplicaStateException,
       FileNotFoundException, EOFException, IOException {
-    final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+    final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
         b.getLocalBlock());
     if (replicaInfo == null) {
       throw new ReplicaNotFoundException(b);
@@ -1950,7 +1950,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public boolean isValidBlock(ExtendedBlock b) {
     return isValid(b, ReplicaState.FINALIZED);
   }
-  
+
   /**
    * Check whether the given block is a valid RBW.
    */
@@ -2182,7 +2182,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       genstamp = info.getGenerationStamp();
       volumeExecutor = volume.getCacheExecutor();
     }
-    cacheManager.cacheBlock(blockId, bpid, 
+    cacheManager.cacheBlock(blockId, bpid,
         blockFileName, length, genstamp, volumeExecutor);
   }
 
@@ -2237,12 +2237,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * if some volumes failed - the caller must emove all the blocks that belong
    * to these failed volumes.
    * @return the failed volumes. Returns null if no volume failed.
+   * @param failedVolumes
    */
   @Override // FsDatasetSpi
-  public Set<File> checkDataDir() {
-   return volumes.checkDirs();
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
+    volumes.handleVolumeFailures(failedVolumes);
   }
-    
+
 
   @Override // FsDatasetSpi
   public String toString() {
@@ -2250,14 +2251,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   private ObjectName mbeanName;
-  
+
   /**
    * Register the FSDataset MBean using the name
    *        "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>"
    */
   void registerMBean(final String datanodeUuid) {
     // We wrap to bypass standard mbean naming convetion.
-    // This wraping can be removed in java 6 as it is more flexible in 
+    // This wraping can be removed in java 6 as it is more flexible in
     // package naming for mbeans and their impl.
     try {
       StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class);
@@ -2280,7 +2281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
     }
-    
+
     if (asyncDiskService != null) {
       asyncDiskService.shutdown();
     }
@@ -2288,7 +2289,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (asyncLazyPersistService != null) {
       asyncLazyPersistService.shutdown();
     }
-    
+
     if(volumes != null) {
       volumes.shutdown();
     }
@@ -2497,7 +2498,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return volumeMap.get(bpid, blockId);
   }
 
-  @Override 
+  @Override
   public String getReplicaString(String bpid, long blockId) {
     try(AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
@@ -2753,7 +2754,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       return replica.getVisibleLength();
     }
   }
-  
+
   @Override
   public void addBlockPool(String bpid, Configuration conf)
       throws IOException {
@@ -2775,7 +2776,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volumes.removeBlockPool(bpid, blocksPerVolume);
     }
   }
-  
+
   /**
    * Class for representing the Datanode volume information
    */
@@ -2796,7 +2797,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       this.reservedSpaceForReplicas = v.getReservedForReplicas();
       this.numBlocks = v.getNumBlocks();
     }
-  }  
+  }
 
   private Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
@@ -2813,7 +2814,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         used = 0;
         free = 0;
       }
-      
+
       info.add(new VolumeInfo(volume, used, free));
     }
     return info;
@@ -2863,7 +2864,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
   }
-  
+
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
@@ -3237,7 +3238,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       shouldRun = false;
     }
   }
-  
+
   @Override
   public void setPinning(ExtendedBlock block) throws IOException {
     if (!blockPinningEnabled) {
@@ -3254,7 +3255,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
     localFS.setPermission(p, permission);
   }
-  
+
   @Override
   public boolean getPinning(ExtendedBlock block) throws IOException {
     if (!blockPinningEnabled) {
@@ -3265,7 +3266,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
     return fss.getPermission().getStickyBit();
   }
-  
+
   @Override
   public boolean isDeletingBlock(String bpid, long blockId) {
     synchronized(deletingBlock) {
@@ -3289,7 +3290,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
   }
-  
+
   private void addDeletingBlock(String bpid, Long blockId) {
     synchronized(deletingBlock) {
       Set<Long> s = deletingBlock.get(bpid);
@@ -3361,4 +3362,3 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 }
-

+ 0 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -901,13 +901,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return cacheExecutor;
   }
 
-  void checkDirs() throws DiskErrorException {
-    // TODO:FEDERATION valid synchronization
-    for(BlockPoolSlice s : bpSlices.values()) {
-      s.checkDirs();
-    }
-  }
-
   @Override
   public VolumeCheckResult check(VolumeCheckContext ignored)
       throws DiskErrorException {

+ 12 - 31
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
 class FsVolumeList {
@@ -96,11 +94,11 @@ class FsVolumeList {
     }
   }
 
-  /** 
+  /**
    * Get next volume.
    *
    * @param blockSize free space needed on the volume
-   * @param storageType the desired {@link StorageType} 
+   * @param storageType the desired {@link StorageType}
    * @return next volume to store the block in.
    */
   FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
@@ -167,7 +165,7 @@ class FsVolumeList {
     }
     return capacity;
   }
-    
+
   long getRemaining() throws IOException {
     long remaining = 0L;
     for (FsVolumeSpi vol : volumes) {
@@ -179,7 +177,7 @@ class FsVolumeList {
     }
     return remaining;
   }
-  
+
   void getAllVolumesMap(final String bpid,
                         final ReplicaMap volumeMap,
                         final RamDiskReplicaTracker ramDiskReplicaMap)
@@ -229,28 +227,17 @@ class FsVolumeList {
 
   /**
    * Calls {@link FsVolumeImpl#checkDirs()} on each volume.
-   * 
+   *
    * Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
    *
    * @return list of all the failed volumes.
+   * @param failedVolumes
    */
-  Set<File> checkDirs() {
+  void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
     try (AutoCloseableLock lock = checkDirsLock.acquire()) {
-      Set<File> failedVols = null;
-      
-      // Make a copy of volumes for performing modification 
-      final List<FsVolumeImpl> volumeList = getVolumes();
-
-      for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
-        final FsVolumeImpl fsv = i.next();
+      for(FsVolumeSpi vol : failedVolumes) {
+        FsVolumeImpl fsv = (FsVolumeImpl) vol;
         try (FsVolumeReference ref = fsv.obtainReference()) {
-          fsv.checkDirs();
-        } catch (DiskErrorException e) {
-          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
-          if (failedVols == null) {
-            failedVols = new HashSet<>(1);
-          }
-          failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
           addVolumeFailureInfo(fsv);
           removeVolume(fsv);
         } catch (ClosedChannelException e) {
@@ -260,14 +247,8 @@ class FsVolumeList {
           FsDatasetImpl.LOG.error("Unexpected IOException", e);
         }
       }
-      
-      if (failedVols != null && failedVols.size() > 0) {
-        FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
-            + " failure volumes.");
-      }
 
       waitVolumeRemoved(5000, checkDirsLockCondition);
-      return failedVols;
     }
   }
 
@@ -405,7 +386,7 @@ class FsVolumeList {
 
   void addBlockPool(final String bpid, final Configuration conf) throws IOException {
     long totalStartTime = Time.monotonicNow();
-    
+
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
@@ -442,12 +423,12 @@ class FsVolumeList {
     if (!exceptions.isEmpty()) {
       throw exceptions.get(0);
     }
-    
+
     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
     FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
         bpid + ": " + totalTimeTaken + "ms");
   }
-  
+
   void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
       blocksPerVolume) {
     for (FsVolumeImpl v : volumes) {

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java

@@ -30,9 +30,11 @@ import java.net.URL;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -52,6 +54,8 @@ public class TestBlockStatsMXBean {
   @Before
   public void setup() throws IOException {
     HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = null;
     StorageType[][] types = new StorageType[6][];
     for (int i=0; i<3; i++) {

+ 91 - 84
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -68,18 +68,18 @@ import org.apache.hadoop.util.DataChecksum;
 
 /**
  * This class implements a simulated FSDataset.
- * 
+ *
  * Blocks that are created are recorded but their data (plus their CRCs) are
  *  discarded.
  * Fixed data is returned when blocks are read; a null CRC meta file is
  * created for such data.
- * 
+ *
  * This FSDataset does not remember any block information across its
  * restarts; it does however offer an operation to inject blocks
  *  (See the TestInectionForSImulatedStorage()
  * for a usage example of injection.
- * 
- * Note the synchronization is coarse grained - it is at each method. 
+ *
+ * Note the synchronization is coarse grained - it is at each method.
  */
 public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public final static int BYTE_MASK = 0xff;
@@ -95,7 +95,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       return true;
     }
   }
-  
+
   public static void setFactory(Configuration conf) {
     conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
         Factory.class.getName());
@@ -105,12 +105,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     byte firstByte = (byte) (b.getBlockId() & BYTE_MASK);
     return (byte) ((firstByte + offsetInBlk) & BYTE_MASK);
   }
-  
+
   public static final String CONFIG_PROPERTY_CAPACITY =
       "dfs.datanode.simulateddatastorage.capacity";
-  
+
   public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
-  
+
   public static final String CONFIG_PROPERTY_STATE =
       "dfs.datanode.simulateddatastorage.state";
   private static final DatanodeStorage.State DEFAULT_STATE =
@@ -145,7 +145,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       if (theBlock.getNumBytes() < 0) {
         theBlock.setNumBytes(0);
       }
-      if (!storage.alloc(bpid, theBlock.getNumBytes())) { 
+      if (!storage.alloc(bpid, theBlock.getNumBytes())) {
         // expected length - actual length may
         // be more - we find out at finalize
         DataNode.LOG.warn("Lack of free storage on a block alloc");
@@ -160,7 +160,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         oStream = null;
       }
     }
-    
+
     @Override
     public String getStorageUuid() {
       return storage.getStorageUuid();
@@ -188,7 +188,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         theBlock.setNumBytes(length);
       }
     }
-    
+
     synchronized SimulatedInputStream getIStream() {
       if (!finalized) {
         // throw new IOException("Trying to read an unfinalized block");
@@ -197,12 +197,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         return new SimulatedInputStream(theBlock.getNumBytes(), theBlock);
       }
     }
-    
+
     synchronized void finalizeBlock(String bpid, long finalSize)
         throws IOException {
       if (finalized) {
         throw new IOException(
-            "Finalizing a block that has already been finalized" + 
+            "Finalizing a block that has already been finalized" +
             theBlock.getBlockId());
       }
       if (oStream == null) {
@@ -216,7 +216,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         throw new IOException(
           "Size passed to finalize does not match the amount of data written");
       }
-      // We had allocated the expected length when block was created; 
+      // We had allocated the expected length when block was created;
       // adjust if necessary
       long extraLen = finalSize - theBlock.getNumBytes();
       if (extraLen > 0) {
@@ -227,7 +227,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       } else {
         storage.free(bpid, -extraLen);
       }
-      theBlock.setNumBytes(finalSize);  
+      theBlock.setNumBytes(finalSize);
 
       finalized = true;
       oStream = null;
@@ -248,7 +248,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     SimulatedInputStream getMetaIStream() {
-      return new SimulatedInputStream(nullCrcFileData);  
+      return new SimulatedInputStream(nullCrcFileData);
     }
 
     synchronized boolean isFinalized() {
@@ -257,8 +257,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
-        DataChecksum requestedChecksum)
-        throws IOException {
+        DataChecksum requestedChecksum) throws IOException {
       if (finalized) {
         throw new IOException("Trying to write to a finalized replica "
             + theBlock);
@@ -333,31 +332,31 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       return false;
     }
   }
-  
+
   /**
    * Class is used for tracking block pool storage utilization similar
    * to {@link BlockPoolSlice}
    */
   private static class SimulatedBPStorage {
     private long used;    // in bytes
-    
+
     long getUsed() {
       return used;
     }
-    
+
     void alloc(long amount) {
       used += amount;
     }
-    
+
     void free(long amount) {
       used -= amount;
     }
-    
+
     SimulatedBPStorage() {
-      used = 0;   
+      used = 0;
     }
   }
-  
+
   /**
    * Class used for tracking datanode level storage utilization similar
    * to {@link FSVolumeSet}
@@ -368,15 +367,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     private final long capacity;  // in bytes
     private final DatanodeStorage dnStorage;
-    
+
     synchronized long getFree() {
       return capacity - getUsed();
     }
-    
+
     long getCapacity() {
       return capacity;
     }
-    
+
     synchronized long getUsed() {
       long used = 0;
       for (SimulatedBPStorage bpStorage : map.values()) {
@@ -384,11 +383,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       }
       return used;
     }
-    
+
     synchronized long getBlockPoolUsed(String bpid) throws IOException {
       return getBPStorage(bpid).getUsed();
     }
-    
+
     int getNumFailedVolumes() {
       return 0;
     }
@@ -398,20 +397,20 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         getBPStorage(bpid).alloc(amount);
         return true;
       }
-      return false;    
+      return false;
     }
-    
+
     synchronized void free(String bpid, long amount) throws IOException {
       getBPStorage(bpid).free(amount);
     }
-    
+
     SimulatedStorage(long cap, DatanodeStorage.State state) {
       capacity = cap;
       dnStorage = new DatanodeStorage(
           "SimulatedStorage-" + DatanodeStorage.generateUuid(),
           state, StorageType.DEFAULT);
     }
-    
+
     synchronized void addBlockPool(String bpid) {
       SimulatedBPStorage bpStorage = map.get(bpid);
       if (bpStorage != null) {
@@ -419,11 +418,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       }
       map.put(bpid, new SimulatedBPStorage());
     }
-    
+
     synchronized void removeBlockPool(String bpid) {
       map.remove(bpid);
     }
-    
+
     private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
       SimulatedBPStorage bpStorage = map.get(bpid);
       if (bpStorage == null) {
@@ -435,7 +434,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     String getStorageUuid() {
       return dnStorage.getStorageID();
     }
-    
+
     DatanodeStorage getDnStorage() {
       return dnStorage;
     }
@@ -462,7 +461,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     public FsVolumeReference obtainReference() throws ClosedChannelException {
-      return null;
+      return new FsVolumeReference() {
+        @Override
+        public void close() throws IOException {
+          // no-op.
+        }
+
+        @Override
+        public FsVolumeSpi getVolume() {
+          return SimulatedVolume.this;
+        }
+      };
     }
 
     @Override
@@ -562,7 +571,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   private final SimulatedVolume volume;
   private final String datanodeUuid;
   private final DataNode datanode;
-  
+
 
   public SimulatedFSDataset(DataStorage storage, Configuration conf) {
     this(null, storage, conf);
@@ -611,14 +620,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         map = new HashMap<Block, BInfo>();
         blockMap.put(bpid, map);
       }
-      
+
       for (Block b: injectBlocks) {
         BInfo binfo = new BInfo(bpid, b, false);
         map.put(binfo.theBlock, binfo);
       }
     }
   }
-  
+
   /** Get a map for a given block pool Id */
   private Map<Block, BInfo> getMap(String bpid) throws IOException {
     final Map<Block, BInfo> map = blockMap.get(bpid);
@@ -684,7 +693,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public long getBlockPoolUsed(String bpid) throws IOException {
     return storage.getBlockPoolUsed(bpid);
   }
-  
+
   @Override // FSDatasetMBean
   public long getRemaining() {
     return storage.getFree();
@@ -775,7 +784,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return null;
   }
 
-  @Override 
+  @Override
   public synchronized String getReplicaString(String bpid, long blockId) {
     Replica r = null;
     final Map<Block, BInfo> map = blockMap.get(bpid);
@@ -865,14 +874,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
    *
    * @throws ReplicaNotFoundException          If the replica is not found
    *
-   * @throws UnexpectedReplicaStateException   If the replica is not in the 
+   * @throws UnexpectedReplicaStateException   If the replica is not in the
    *                                             expected state.
    */
   @Override // {@link FsDatasetSpi}
   public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
       throws ReplicaNotFoundException, UnexpectedReplicaStateException {
     final BInfo binfo = getBInfo(b);
-    
+
     if (binfo == null) {
       throw new ReplicaNotFoundException(b);
     }
@@ -956,7 +965,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     map.put(binfo.theBlock, binfo);
     return binfo;
   }
-  
+
   @Override // FsDatasetSpi
   public synchronized ReplicaHandler recoverRbw(
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
@@ -988,11 +997,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public synchronized ReplicaHandler createTemporary(
       StorageType storageType, ExtendedBlock b) throws IOException {
     if (isValidBlock(b)) {
-          throw new ReplicaAlreadyExistsException("Block " + b + 
+          throw new ReplicaAlreadyExistsException("Block " + b +
               " is valid, and cannot be written to.");
       }
     if (isValidRbw(b)) {
-        throw new ReplicaAlreadyExistsException("Block " + b + 
+        throw new ReplicaAlreadyExistsException("Block " + b +
             " is being written, and cannot be written to.");
     }
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@@ -1006,12 +1015,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
-      throw new IOException("No such Block " + b );  
+      throw new IOException("No such Block " + b );
     }
-    
+
     return binfo.getIStream();
   }
-  
+
   @Override // FsDatasetSpi
   public synchronized InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
@@ -1033,10 +1042,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
-      throw new IOException("No such Block " + b );  
+      throw new IOException("No such Block " + b );
     }
     if (!binfo.finalized) {
-      throw new IOException("Block " + b + 
+      throw new IOException("Block " + b +
           " is being written, its meta cannot be read");
     }
     final SimulatedInputStream sin = binfo.getMetaIStream();
@@ -1044,19 +1053,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public Set<File> checkDataDir() {
-    // nothing to check for simulated data set
-    return null;
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
   }
 
   @Override // FsDatasetSpi
   public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
-                                              ReplicaOutputStreams stream, 
+                                              ReplicaOutputStreams stream,
                                               int checksumSize)
                                               throws IOException {
   }
 
-  /** 
+  /**
    * Simulated input and output streams
    *
    */
@@ -1065,7 +1072,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     int currentPos = 0;
     byte[] data = null;
     Block theBlock = null;
-    
+
     /**
      * An input stream of size l with repeated bytes
      * @param l size of the stream
@@ -1075,7 +1082,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       length = l;
       theBlock = b;
     }
-    
+
     /**
      * An input stream of of the supplied data
      * @param iData data to construct the stream
@@ -1084,7 +1091,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       data = iData;
       length = data.length;
     }
-    
+
     /**
      * @return the lenght of the input stream
      */
@@ -1103,9 +1110,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         return simulatedByte(theBlock, currentPos++) & BYTE_MASK;
       }
     }
-    
+
     @Override
-    public int read(byte[] b) throws IOException { 
+    public int read(byte[] b) throws IOException {
 
       if (b == null) {
         throw new NullPointerException();
@@ -1128,7 +1135,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       return bytesRead;
     }
   }
-  
+
   /**
    * This class implements an output stream that merely throws its data away, but records its
    * length.
@@ -1136,15 +1143,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
    */
   static private class SimulatedOutputStream extends OutputStream {
     long length = 0;
-    
+
     /**
      * constructor for Simulated Output Steram
      */
     SimulatedOutputStream() {
     }
-    
+
     /**
-     * 
+     *
      * @return the length of the data created so far.
      */
     long getLength() {
@@ -1156,17 +1163,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     void setLength(long length) {
       this.length = length;
     }
-    
+
     @Override
     public void write(int arg0) throws IOException {
       length++;
     }
-    
+
     @Override
     public void write(byte[] b) throws IOException {
       length += b.length;
     }
-    
+
     @Override
     public void write(byte[] b,
               int off,
@@ -1174,11 +1181,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       length += len;
     }
   }
-  
+
   private ObjectName mbeanName;
 
 
-  
+
   /**
    * Register the FSDataset MBean using the name
    *        "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
@@ -1187,7 +1194,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
    */
   void registerMBean(final String storageId) {
     // We wrap to bypass standard mbean naming convetion.
-    // This wraping can be removed in java 6 as it is more flexible in 
+    // This wraping can be removed in java 6 as it is more flexible in
     // package naming for mbeans and their impl.
     StandardMBean bean;
 
@@ -1198,7 +1205,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     } catch (NotCompliantMBeanException e) {
       DataNode.LOG.warn("Error registering FSDatasetState MBean", e);
     }
- 
+
     DataNode.LOG.info("Registered FSDatasetState MBean");
   }
 
@@ -1211,7 +1218,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public String getStorageInfo() {
     return "Simulated FSDataset-" + datanodeUuid;
   }
-  
+
   @Override
   public boolean hasEnoughResource() {
     return true;
@@ -1224,11 +1231,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
-      throw new IOException("No such Block " + b );  
+      throw new IOException("No such Block " + b );
     }
 
-    return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(), 
-        binfo.getGenerationStamp(), 
+    return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(),
+        binfo.getGenerationStamp(),
         binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW);
   }
 
@@ -1251,13 +1258,13 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     blockMap.put(bpid, map);
     storage.addBlockPool(bpid);
   }
-  
+
   @Override // FsDatasetSpi
   public void shutdownBlockPool(String bpid) {
     blockMap.remove(bpid);
     storage.removeBlockPool(bpid);
   }
-  
+
   @Override // FsDatasetSpi
   public void deleteBlockPool(String bpid, boolean force) {
      return;
@@ -1321,7 +1328,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override
   public FsVolumeReferences getFsVolumeReferences() {
-    throw new UnsupportedOperationException();
+    return new FsVolumeReferences(Collections.singletonList(volume));
   }
 
   @Override
@@ -1386,17 +1393,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     // TODO Auto-generated method stub
     return null;
   }
-  
+
   @Override
   public void setPinning(ExtendedBlock b) throws IOException {
     blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
   }
-  
+
   @Override
   public boolean getPinning(ExtendedBlock b) throws IOException {
     return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
   }
-  
+
   @Override
   public boolean isDeletingBlock(String bpid, long blockId) {
     throw new UnsupportedOperationException();

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java

@@ -66,6 +66,7 @@ import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -124,6 +125,8 @@ public class TestDataNodeHotSwapVolumes {
         1000);
     /* Allow 1 volume failure */
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
 
     MiniDFSNNTopology nnTopology =
         MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -119,6 +120,8 @@ public class TestDataNodeVolumeFailure {
     // Allow a single volume failure (there are two volumes)
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java

@@ -29,6 +29,7 @@ import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -621,6 +622,8 @@ public class TestDataNodeVolumeFailureReporting {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
         failedVolumesTolerated);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
         .storagesPerDatanode(storagesPerDatanode).build();
     cluster.waitActive();

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java

@@ -24,6 +24,7 @@ import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -144,6 +145,8 @@ public class TestDataNodeVolumeFailureToleration {
     // Bring up two additional datanodes that need both of their volumes
     // functioning in order to stay up.
     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster.startDataNodes(conf, 2, true, null, null);
     cluster.waitActive();
     final DatanodeManager dm = cluster.getNamesystem().getBlockManager(

+ 17 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -26,7 +26,9 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +51,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -69,6 +71,9 @@ public class TestDiskError {
   public void setUp() throws Exception {
     conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
+    conf.setTimeDuration(
+        DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        0, TimeUnit.MILLISECONDS);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
@@ -214,19 +219,22 @@ public class TestDiskError {
    * Before refactoring the code the above function was not getting called 
    * @throws IOException, InterruptedException
    */
-  @Test
-  public void testcheckDiskError() throws IOException, InterruptedException {
+  @Test(timeout=60000)
+  public void testcheckDiskError() throws Exception {
     if(cluster.getDataNodes().size() <= 0) {
       cluster.startDataNodes(conf, 1, true, null, null);
       cluster.waitActive();
     }
-    DataNode dataNode = cluster.getDataNodes().get(0);
-    long slackTime = dataNode.checkDiskErrorInterval/2;
+    final DataNode dataNode = cluster.getDataNodes().get(0);
     //checking for disk error
-    dataNode.checkDiskErrorAsync();
-    Thread.sleep(dataNode.checkDiskErrorInterval);
-    long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
-    assertTrue("Disk Error check is not performed within  " + dataNode.checkDiskErrorInterval +  "  ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
+    final long lastCheckTimestamp = dataNode.getLastDiskErrorCheck();
+    dataNode.checkDiskError();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return dataNode.getLastDiskErrorCheck() > lastCheckTimestamp;
+      }
+    }, 100, 60000);
   }
 
   @Test

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -35,7 +34,10 @@ import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -229,6 +231,7 @@ public class TestDatasetVolumeChecker {
   static FsDatasetSpi<FsVolumeSpi> makeDataset(List<FsVolumeSpi> volumes)
       throws Exception {
     // Create dataset and init volume health.
+    @SuppressWarnings("unchecked") // The cast is safe.
     final FsDatasetSpi<FsVolumeSpi> dataset = mock(FsDatasetSpi.class);
     final FsDatasetSpi.FsVolumeReferences references = new
         FsDatasetSpi.FsVolumeReferences(volumes);
@@ -236,7 +239,7 @@ public class TestDatasetVolumeChecker {
     return dataset;
   }
 
-  private static List<FsVolumeSpi> makeVolumes(
+  static List<FsVolumeSpi> makeVolumes(
       int numVolumes, VolumeCheckResult health) throws Exception {
     final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
     for (int i = 0; i < numVolumes; ++i) {

+ 24 - 19
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java

@@ -21,10 +21,10 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
 import org.apache.hadoop.util.FakeTimer;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -37,7 +37,6 @@ import java.util.*;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.*;
 
 
@@ -48,6 +47,19 @@ public class TestDatasetVolumeCheckerFailures {
   public static final Logger LOG =LoggerFactory.getLogger(
       TestDatasetVolumeCheckerFailures.class);
 
+  private FakeTimer timer;
+  private Configuration conf;
+
+  private static final long MIN_DISK_CHECK_GAP_MS = 1000; // 1 second.
+
+  @Before
+  public void commonInit() {
+    timer = new FakeTimer();
+    conf = new HdfsConfiguration();
+    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
+        MIN_DISK_CHECK_GAP_MS, TimeUnit.MILLISECONDS);
+  }
+
   /**
    * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
    * @throws Exception
@@ -62,7 +74,6 @@ public class TestDatasetVolumeCheckerFailures {
         TestDatasetVolumeChecker.makeDataset(volumes);
 
     // Create a disk checker with a very low timeout.
-    final HdfsConfiguration conf = new HdfsConfiguration();
     conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
         1, TimeUnit.SECONDS);
     final DatasetVolumeChecker checker =
@@ -87,10 +98,10 @@ public class TestDatasetVolumeCheckerFailures {
     final FsDatasetSpi<FsVolumeSpi> dataset =
         TestDatasetVolumeChecker.makeDataset(volumes);
 
-    DatasetVolumeChecker checker =
-        new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
+    DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
     Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
     assertThat(failedVolumes.size(), is(0));
+    assertThat(checker.getNumSyncDatasetChecks(), is(0L));
 
     // The closed volume should not have been checked as it cannot
     // be referenced.
@@ -99,13 +110,10 @@ public class TestDatasetVolumeCheckerFailures {
 
   @Test(timeout=60000)
   public void testMinGapIsEnforcedForSyncChecks() throws Exception {
+    final List<FsVolumeSpi> volumes =
+        TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
     final FsDatasetSpi<FsVolumeSpi> dataset =
-        TestDatasetVolumeChecker.makeDataset(new ArrayList<FsVolumeSpi>());
-    final FakeTimer timer = new FakeTimer();
-    final Configuration conf = new HdfsConfiguration();
-    final long minGapMs = 100;
-    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
-        minGapMs, TimeUnit.MILLISECONDS);
+        TestDatasetVolumeChecker.makeDataset(volumes);
     final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
 
     checker.checkAllVolumes(dataset);
@@ -117,7 +125,7 @@ public class TestDatasetVolumeCheckerFailures {
     assertThat(checker.getNumSkippedChecks(), is(1L));
 
     // Re-check after advancing the timer. Ensure the check is performed.
-    timer.advance(minGapMs);
+    timer.advance(MIN_DISK_CHECK_GAP_MS);
     checker.checkAllVolumes(dataset);
     assertThat(checker.getNumSyncDatasetChecks(), is(2L));
     assertThat(checker.getNumSkippedChecks(), is(1L));
@@ -125,13 +133,10 @@ public class TestDatasetVolumeCheckerFailures {
 
   @Test(timeout=60000)
   public void testMinGapIsEnforcedForASyncChecks() throws Exception {
+    final List<FsVolumeSpi> volumes =
+        TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
     final FsDatasetSpi<FsVolumeSpi> dataset =
-        TestDatasetVolumeChecker.makeDataset(new ArrayList<FsVolumeSpi>());
-    final FakeTimer timer = new FakeTimer();
-    final Configuration conf = new HdfsConfiguration();
-    final long minGapMs = 100;
-    conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
-        minGapMs, TimeUnit.MILLISECONDS);
+        TestDatasetVolumeChecker.makeDataset(volumes);
     final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
 
     checker.checkAllVolumesAsync(dataset, null);
@@ -143,7 +148,7 @@ public class TestDatasetVolumeCheckerFailures {
     assertThat(checker.getNumSkippedChecks(), is(1L));
 
     // Re-check after advancing the timer. Ensure the check is performed.
-    timer.advance(minGapMs);
+    timer.advance(MIN_DISK_CHECK_GAP_MS);
     checker.checkAllVolumesAsync(dataset, null);
     assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
     assertThat(checker.getNumSkippedChecks(), is(1L));

+ 3 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -238,8 +238,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public Set<File> checkDataDir() {
-    return null;
+  public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
   }
 
   @Override
@@ -432,14 +431,14 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public void setPinning(ExtendedBlock block) throws IOException {    
+  public void setPinning(ExtendedBlock block) throws IOException {
   }
 
   @Override
   public boolean getPinning(ExtendedBlock block) throws IOException {
     return false;
   }
-  
+
   @Override
   public boolean isDeletingBlock(String bpid, long blockId) {
     return false;

+ 13 - 73
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -51,12 +51,9 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -64,8 +61,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -74,16 +69,18 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -92,13 +89,10 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.slf4j.Logger;
@@ -120,7 +114,7 @@ public class TestFsDatasetImpl {
   private DataNode datanode;
   private DataStorage storage;
   private FsDatasetImpl dataset;
-  
+
   private final static String BLOCKPOOL = "BP-TEST";
 
   private static Storage.StorageDirectory createStorageDirectory(File root) {
@@ -324,64 +318,6 @@ public class TestFsDatasetImpl {
     assertEquals(numExistingVolumes, getNumVolumes());
   }
 
-  @Test(timeout = 5000)
-  public void testChangeVolumeWithRunningCheckDirs() throws IOException {
-    RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
-        new RoundRobinVolumeChoosingPolicy<>();
-    conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
-    final BlockScanner blockScanner = new BlockScanner(datanode);
-    final FsVolumeList volumeList = new FsVolumeList(
-        Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
-    final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
-
-    // Initialize FsVolumeList with 5 mock volumes.
-    final int NUM_VOLUMES = 5;
-    for (int i = 0; i < NUM_VOLUMES; i++) {
-      FsVolumeImpl volume = mock(FsVolumeImpl.class);
-      oldVolumes.add(volume);
-      when(volume.getBasePath()).thenReturn("data" + i);
-      when(volume.checkClosed()).thenReturn(true);
-      FsVolumeReference ref = mock(FsVolumeReference.class);
-      when(ref.getVolume()).thenReturn(volume);
-      volumeList.addVolume(ref);
-    }
-
-    // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
-    // volume and add another volume. It does not affect checkDirs() running.
-    final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
-    final FsVolumeReference newRef = mock(FsVolumeReference.class);
-    when(newRef.getVolume()).thenReturn(newVolume);
-    when(newVolume.getBasePath()).thenReturn("data4");
-    FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        volumeList.removeVolume(new File("data4"), false);
-        volumeList.addVolume(newRef);
-        return null;
-      }
-    }).when(blockedVolume).checkDirs();
-
-    FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
-    doThrow(new DiskChecker.DiskErrorException("broken"))
-        .when(brokenVolume).checkDirs();
-
-    volumeList.checkDirs();
-
-    // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
-    // before running removeVolume(), it is supposed to run checkDirs() on all
-    // the old volumes.
-    for (FsVolumeImpl volume : oldVolumes) {
-      verify(volume).checkDirs();
-    }
-    // New volume is not visible to checkDirs() process.
-    verify(newVolume, never()).checkDirs();
-    assertTrue(volumeList.getVolumes().contains(newVolume));
-    assertFalse(volumeList.getVolumes().contains(brokenVolume));
-    assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
-  }
-
   @Test
   public void testAddVolumeFailureReleasesInUseLock() throws IOException {
     FsDatasetImpl spyDataset = spy(dataset);
@@ -417,7 +353,7 @@ public class TestFsDatasetImpl {
 
     FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
   }
-  
+
   @Test
   public void testDeletingBlocks() throws IOException {
     HdfsConfiguration conf = new HdfsConfiguration();
@@ -425,7 +361,7 @@ public class TestFsDatasetImpl {
     try {
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
-      
+
       FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
       ds.addBlockPool(BLOCKPOOL, conf);
       FsVolumeImpl vol;
@@ -697,6 +633,9 @@ public class TestFsDatasetImpl {
       Configuration config = new HdfsConfiguration();
       config.setLong(
           DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+      config.setTimeDuration(
+          DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0,
+          TimeUnit.MILLISECONDS);
       config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
 
       cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
@@ -714,6 +653,8 @@ public class TestFsDatasetImpl {
           getVolume(block);
       File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
           .getBlockPoolId());
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+      DatanodeInfo info = lb.getLocations()[0];
 
       if (finalizedDir.exists()) {
         // Remove write and execute access so that checkDiskErrorThread detects
@@ -724,15 +665,14 @@ public class TestFsDatasetImpl {
       Assert.assertTrue("Reference count for the volume should be greater "
           + "than 0", volume.getReferenceCount() > 0);
       // Invoke the synchronous checkDiskError method
-      dataNode.getFSDataset().checkDataDir();
+      dataNode.checkDiskError();
       // Sleep for 1 second so that datanode can interrupt and cluster clean up
       GenericTestUtils.waitFor(new Supplier<Boolean>() {
           @Override public Boolean get() {
               return volume.getReferenceCount() == 0;
             }
           }, 100, 10);
-      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
-      DatanodeInfo info = lb.getLocations()[0];
+      assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1));
 
       try {
         out.close();

+ 0 - 32
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java

@@ -101,38 +101,6 @@ public class TestFsVolumeList {
     }
   }
 
-  @Test(timeout=30000)
-  public void testCheckDirsWithClosedVolume() throws IOException {
-    FsVolumeList volumeList = new FsVolumeList(
-        Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
-    final List<FsVolumeImpl> volumes = new ArrayList<>();
-    for (int i = 0; i < 3; i++) {
-      File curDir = new File(baseDir, "volume-" + i);
-      curDir.mkdirs();
-      FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
-          conf, StorageType.DEFAULT);
-      volumes.add(volume);
-      volumeList.addVolume(volume.obtainReference());
-    }
-
-    // Close the 2nd volume.
-    volumes.get(1).setClosed();
-    try {
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          return volumes.get(1).checkClosed();
-        }
-      }, 100, 3000);
-    } catch (TimeoutException e) {
-      fail("timed out while waiting for volume to be removed.");
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-    }
-    // checkDirs() should ignore the 2nd volume since it is closed.
-    volumeList.checkDirs();
-  }
-
   @Test(timeout=30000)
   public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(

部分文件因为文件数量过多而无法显示