Ver código fonte

HDFS-7722. DataNode#checkDiskError should also remove Storage when error is found. (Lei Xu via Colin P. McCabe)

(cherry picked from commit b49c3a1813aa8c5b05fe6c02a653286c573137ca)
(cherry picked from commit 7455412a241486a86ad4232f2eaa666c6454329e)
Colin Patrick Mccabe 10 anos atrás
pai
commit
861cc05092
13 arquivos alterados com 330 adições e 133 exclusões
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 95 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  3. 5 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
  4. 7 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  5. 23 58
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  6. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  7. 25 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
  8. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  9. 65 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
  10. 70 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  11. 19 17
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java
  12. 3 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  13. 9 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -864,6 +864,9 @@ Release 2.7.0 - UNRELEASED
       HDFS-7806. Refactor: move StorageType from hadoop-hdfs to
       hadoop-common. (Xiaoyu Yao via Arpit Agarwal)
 
+      HDFS-7722. DataNode#checkDiskError should also remove Storage when error
+      is found. (Lei Xu via Colin P. McCabe)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 95 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -53,6 +53,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -73,6 +74,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -616,20 +618,16 @@ public class DataNode extends ReconfigurableBase
             errorMessageBuilder.append(
                 String.format("FAILED to ADD: %s: %s%n", volume,
                               e.toString()));
+            LOG.error("Failed to add volume: " + volume, e);
           }
         }
       }
 
-      if (!changedVolumes.deactivateLocations.isEmpty()) {
-        LOG.info("Deactivating volumes: " +
-            Joiner.on(",").join(changedVolumes.deactivateLocations));
-
-        data.removeVolumes(changedVolumes.deactivateLocations);
-        try {
-          storage.removeVolumes(changedVolumes.deactivateLocations);
-        } catch (IOException e) {
-          errorMessageBuilder.append(e.getMessage());
-        }
+      try {
+        removeVolumes(changedVolumes.deactivateLocations);
+      } catch (IOException e) {
+        errorMessageBuilder.append(e.getMessage());
+        LOG.error("Failed to remove volume: " + e.getMessage(), e);
       }
 
       if (errorMessageBuilder.length() > 0) {
@@ -642,6 +640,79 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
+  /**
+   * Remove volumes from DataNode.
+   * See {@link removeVolumes(final Set<File>, boolean)} for details.
+   *
+   * @param locations the StorageLocations of the volumes to be removed.
+   * @throws IOException
+   */
+  private void removeVolumes(final Collection<StorageLocation> locations)
+    throws IOException {
+    if (locations.isEmpty()) {
+      return;
+    }
+    Set<File> volumesToRemove = new HashSet<>();
+    for (StorageLocation loc : locations) {
+      volumesToRemove.add(loc.getFile().getAbsoluteFile());
+    }
+    removeVolumes(volumesToRemove, true);
+  }
+
+  /**
+   * Remove volumes from DataNode.
+   *
+   * It does three things:
+   * <li>
+   *   <ul>Remove volumes and block info from FsDataset.</ul>
+   *   <ul>Remove volumes from DataStorage.</ul>
+   *   <ul>Reset configuration DATA_DIR and {@link dataDirs} to represent
+   *   active volumes.</ul>
+   * </li>
+   * @param absoluteVolumePaths the absolute path of volumes.
+   * @param clearFailure if true, clears the failure information related to the
+   *                     volumes.
+   * @throws IOException
+   */
+  private synchronized void removeVolumes(
+      final Set<File> absoluteVolumePaths, boolean clearFailure)
+      throws IOException {
+    for (File vol : absoluteVolumePaths) {
+      Preconditions.checkArgument(vol.isAbsolute());
+    }
+
+    if (absoluteVolumePaths.isEmpty()) {
+      return;
+    }
+
+    LOG.info(String.format("Deactivating volumes (clear failure=%b): %s",
+        clearFailure, Joiner.on(",").join(absoluteVolumePaths)));
+
+    IOException ioe = null;
+    // Remove volumes and block infos from FsDataset.
+    data.removeVolumes(absoluteVolumePaths, clearFailure);
+
+    // Remove volumes from DataStorage.
+    try {
+      storage.removeVolumes(absoluteVolumePaths);
+    } catch (IOException e) {
+      ioe = e;
+    }
+
+    // Set configuration and dataDirs to reflect volume changes.
+    for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) {
+      StorageLocation loc = it.next();
+      if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) {
+        it.remove();
+      }
+    }
+    conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
+
+    if (ioe != null) {
+      throw ioe;
+    }
+  }
+
   private synchronized void setClusterId(final String nsCid, final String bpid
       ) throws IOException {
     if(clusterId != null && !clusterId.equals(nsCid)) {
@@ -3083,10 +3154,20 @@ public class DataNode extends ReconfigurableBase
    * Check the disk error
    */
   private void checkDiskError() {
-    try {
-      data.checkDataDir();
-    } catch (DiskErrorException de) {
-      handleDiskError(de.getMessage());
+    Set<File> unhealthyDataDirs = data.checkDataDir();
+    if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
+      try {
+        // Remove all unhealthy volumes from DataNode.
+        removeVolumes(unhealthyDataDirs, false);
+      } catch (IOException e) {
+        LOG.warn("Error occurred when removing unhealthy storage dirs: "
+            + e.getMessage(), e);
+      }
+      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
+      for (File dataDir : unhealthyDataDirs) {
+        sb.append(dataDir.getAbsolutePath() + ";");
+      }
+      handleDiskError(sb.toString());
     }
   }
 

+ 5 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -404,28 +404,23 @@ public class DataStorage extends Storage {
   }
 
   /**
-   * Remove volumes from DataStorage. All volumes are removed even when the
+   * Remove storage dirs from DataStorage. All storage dirs are removed even when the
    * IOException is thrown.
    *
-   * @param locations a collection of volumes.
+   * @param dirsToRemove a set of storage directories to be removed.
    * @throws IOException if I/O error when unlocking storage directory.
    */
-  synchronized void removeVolumes(Collection<StorageLocation> locations)
+  synchronized void removeVolumes(final Set<File> dirsToRemove)
       throws IOException {
-    if (locations.isEmpty()) {
+    if (dirsToRemove.isEmpty()) {
       return;
     }
 
-    Set<File> dataDirs = new HashSet<File>();
-    for (StorageLocation sl : locations) {
-      dataDirs.add(sl.getFile());
-    }
-
     StringBuilder errorMsgBuilder = new StringBuilder();
     for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
          it.hasNext(); ) {
       StorageDirectory sd = it.next();
-      if (dataDirs.contains(sd.getRoot())) {
+      if (dirsToRemove.contains(sd.getRoot())) {
         // Remove the block pool level storage first.
         for (Map.Entry<String, BlockPoolSliceStorage> entry :
             this.bpStorageMap.entrySet()) {

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -113,9 +114,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * If the FSDataset supports block scanning, this function removes
    * the volumes from the block scanner.
    *
-   * @param volumes      The storage locations of the volumes to remove.
+   * @param volumes  The paths of the volumes to be removed.
+   * @param clearFailure set true to clear the failure information about the
+   *                     volumes.
    */
-  public void removeVolumes(Collection<StorageLocation> volumes);
+  public void removeVolumes(Set<File> volumes, boolean clearFailure);
 
   /** @return a storage with the given storage ID */
   public DatanodeStorage getStorage(final String storageUuid);
@@ -388,9 +391,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
     /**
      * Check if all the data directories are healthy
-     * @throws DiskErrorException
+     * @return A set of unhealthy data directories.
      */
-  public void checkDataDir() throws DiskErrorException;
+  public Set<File> checkDataDir();
 
   /**
    * Shutdown the FSDataset

+ 23 - 58
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -447,41 +447,42 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
-   * Removes a collection of volumes from FsDataset.
-   * @param volumes the root directories of the volumes.
+   * Removes a set of volumes from FsDataset.
+   * @param volumesToRemove a set of absolute root path of each volume.
+   * @param clearFailure set true to clear failure information.
    *
    * DataNode should call this function before calling
    * {@link DataStorage#removeVolumes(java.util.Collection)}.
    */
   @Override
-  public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
-    Set<String> volumeSet = new HashSet<>();
-    for (StorageLocation sl : volumes) {
-      volumeSet.add(sl.getFile().getAbsolutePath());
+  public synchronized void removeVolumes(
+      Set<File> volumesToRemove, boolean clearFailure) {
+    // Make sure that all volumes are absolute path.
+    for (File vol : volumesToRemove) {
+      Preconditions.checkArgument(vol.isAbsolute(),
+          String.format("%s is not absolute path.", vol.getPath()));
     }
     for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
       Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
-      String volume = sd.getRoot().getAbsolutePath();
-      if (volumeSet.contains(volume)) {
-        LOG.info("Removing " + volume + " from FsDataset.");
+      final File absRoot = sd.getRoot().getAbsoluteFile();
+      if (volumesToRemove.contains(absRoot)) {
+        LOG.info("Removing " + absRoot + " from FsDataset.");
 
         // Disable the volume from the service.
         asyncDiskService.removeVolume(sd.getCurrentDir());
-        this.volumes.removeVolume(sd.getRoot());
+        volumes.removeVolume(absRoot, clearFailure);
 
         // Removed all replica information for the blocks on the volume. Unlike
         // updating the volumeMap in addVolume(), this operation does not scan
         // disks.
         for (String bpid : volumeMap.getBlockPoolList()) {
-          List<Block> blocks = new ArrayList<Block>();
           for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
-              it.hasNext(); ) {
+               it.hasNext(); ) {
             ReplicaInfo block = it.next();
-            String absBasePath =
-                  new File(block.getVolume().getBasePath()).getAbsolutePath();
-            if (absBasePath.equals(volume)) {
+            final File absBasePath =
+                new File(block.getVolume().getBasePath()).getAbsoluteFile();
+            if (absBasePath.equals(absRoot)) {
               invalidate(bpid, block);
-              blocks.add(block);
               it.remove();
             }
           }
@@ -1971,50 +1972,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   /**
    * check if a data directory is healthy
-   * if some volumes failed - make sure to remove all the blocks that belong
-   * to these volumes
-   * @throws DiskErrorException
+   *
+   * if some volumes failed - the caller must emove all the blocks that belong
+   * to these failed volumes.
+   * @return the failed volumes. Returns null if no volume failed.
    */
   @Override // FsDatasetSpi
-  public void checkDataDir() throws DiskErrorException {
-    long totalBlocks=0, removedBlocks=0;
-    List<FsVolumeImpl> failedVols =  volumes.checkDirs();
-    
-    // If there no failed volumes return
-    if (failedVols == null) { 
-      return;
-    }
-    
-    // Otherwise remove blocks for the failed volumes
-    long mlsec = Time.now();
-    synchronized (this) {
-      for (FsVolumeImpl fv: failedVols) {
-        for (String bpid : fv.getBlockPoolList()) {
-          Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
-          while(ib.hasNext()) {
-            ReplicaInfo b = ib.next();
-            totalBlocks++;
-            // check if the volume block belongs to still valid
-            if(b.getVolume() == fv) {
-              LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
-                  + " on failed volume " + fv.getCurrentDir().getAbsolutePath());
-              ib.remove();
-              removedBlocks++;
-            }
-          }
-        }
-      }
-    } // end of sync
-    mlsec = Time.now() - mlsec;
-    LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
-        "(took " + mlsec + " millisecs)");
-
-    // report the error
-    StringBuilder sb = new StringBuilder();
-    for (FsVolumeImpl fv : failedVols) {
-      sb.append(fv.getCurrentDir().getAbsolutePath() + ";");
-    }
-    throw new DiskErrorException("DataNode failed volumes:" + sb);
+  public Set<File> checkDataDir() {
+   return volumes.checkDirs();
   }
     
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -289,7 +289,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
-  long getDfsUsed() throws IOException {
+  @VisibleForTesting
+  public long getDfsUsed() throws IOException {
     long dfsUsed = 0;
     synchronized(dataset) {
       for(BlockPoolSlice s : bpSlices.values()) {

+ 25 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -24,10 +24,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Lists;
@@ -218,16 +220,15 @@ class FsVolumeList {
   }
 
   /**
-   * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
-   * volumes from the active list that result in a DiskErrorException.
+   * Calls {@link FsVolumeImpl#checkDirs()} on each volume.
    * 
    * Use checkDirsMutext to allow only one instance of checkDirs() call
    *
-   * @return list of all the removed volumes.
+   * @return list of all the failed volumes.
    */
-  List<FsVolumeImpl> checkDirs() {
+  Set<File> checkDirs() {
     synchronized(checkDirsMutex) {
-      ArrayList<FsVolumeImpl> removedVols = null;
+      Set<File> failedVols = null;
       
       // Make a copy of volumes for performing modification 
       final List<FsVolumeImpl> volumeList = getVolumes();
@@ -238,12 +239,12 @@ class FsVolumeList {
           fsv.checkDirs();
         } catch (DiskErrorException e) {
           FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
-          if (removedVols == null) {
-            removedVols = new ArrayList<>(1);
+          if (failedVols == null) {
+            failedVols = new HashSet<>(1);
           }
-          removedVols.add(fsv);
-          removeVolume(fsv);
+          failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
           addVolumeFailureInfo(fsv);
+          removeVolume(fsv);
         } catch (ClosedChannelException e) {
           FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
             "reference count on closed volume", e);
@@ -252,12 +253,12 @@ class FsVolumeList {
         }
       }
       
-      if (removedVols != null && removedVols.size() > 0) {
-        FsDatasetImpl.LOG.warn("Completed checkDirs. Removed " + removedVols.size()
-            + " volumes. Current volumes: " + this);
+      if (failedVols != null && failedVols.size() > 0) {
+        FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
+            + " failure volumes.");
       }
 
-      return removedVols;
+      return failedVols;
     }
   }
 
@@ -290,6 +291,9 @@ class FsVolumeList {
     if (blockScanner != null) {
       blockScanner.addVolumeScanner(ref);
     }
+    // If the volume is used to replace a failed volume, it needs to reset the
+    // volume failure info for this volume.
+    removeVolumeFailureInfo(new File(ref.getVolume().getBasePath()));
     FsDatasetImpl.LOG.info("Added new volume: " +
         ref.getVolume().getStorageID());
   }
@@ -337,8 +341,9 @@ class FsVolumeList {
   /**
    * Dynamically remove volume in the list.
    * @param volume the volume to be removed.
+   * @param clearFailure set true to remove failure info for this volume.
    */
-  void removeVolume(File volume) {
+  void removeVolume(File volume, boolean clearFailure) {
     // Make a copy of volumes to remove one volume.
     final FsVolumeImpl[] curVolumes = volumes.get();
     final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
@@ -352,7 +357,9 @@ class FsVolumeList {
         removeVolume(fsVolume);
       }
     }
-    removeVolumeFailureInfo(volume);
+    if (clearFailure) {
+      removeVolumeFailureInfo(volume);
+    }
   }
 
   VolumeFailureInfo[] getVolumeFailureInfos() {
@@ -366,7 +373,9 @@ class FsVolumeList {
   }
 
   private void addVolumeFailureInfo(FsVolumeImpl vol) {
-    addVolumeFailureInfo(new VolumeFailureInfo(vol.getBasePath(), Time.now(),
+    addVolumeFailureInfo(new VolumeFailureInfo(
+        new File(vol.getBasePath()).getAbsolutePath(),
+        Time.now(),
         vol.getCapacity()));
   }
 

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -958,8 +959,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public void checkDataDir() throws DiskErrorException {
+  public Set<File> checkDataDir() {
     // nothing to check for simulated data set
+    return null;
   }
 
   @Override // FsDatasetSpi
@@ -1280,7 +1282,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
+  public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) {
     throw new UnsupportedOperationException();
   }
 

+ 65 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockMissingException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -95,6 +97,8 @@ public class TestDataNodeHotSwapVolumes {
     conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
         1000);
+    /* Allow 1 volume failure */
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
 
     MiniDFSNNTopology nnTopology =
         MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
@@ -646,4 +650,65 @@ public class TestDataNodeHotSwapVolumes {
     // this directory were removed from the previous step.
     dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
   }
+
+  /** Get the FsVolume on the given basePath */
+  private FsVolumeImpl getVolume(DataNode dn, File basePath) {
+    for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) {
+      if (vol.getBasePath().equals(basePath.getPath())) {
+        return (FsVolumeImpl)vol;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
+   * DataNode upon a volume failure. Thus we can run reconfig on the same
+   * configuration to reload the new volume on the same directory as the failed one.
+   */
+  @Test(timeout=60000)
+  public void testDirectlyReloadAfterCheckDiskError()
+      throws IOException, TimeoutException, InterruptedException,
+      ReconfigurationException {
+    startDFSCluster(1, 2);
+    createFile(new Path("/test"), 32, (short)2);
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY);
+    File dirToFail = new File(cluster.getDataDirectory(), "data1");
+
+    FsVolumeImpl failedVolume = getVolume(dn, dirToFail);
+    assertTrue("No FsVolume was found for " + dirToFail,
+        failedVolume != null);
+    long used = failedVolume.getDfsUsed();
+
+    try {
+      assertTrue("Couldn't chmod local vol: " + dirToFail,
+          FileUtil.setExecutable(dirToFail, false));
+      // Call and wait DataNode to detect disk failure.
+      long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
+      dn.checkDiskErrorAsync();
+      while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
+        Thread.sleep(100);
+      }
+
+      createFile(new Path("/test1"), 32, (short)2);
+      assertEquals(used, failedVolume.getDfsUsed());
+    } finally {
+      // Need to restore the mode on dirToFail. Otherwise, if an Exception
+      // is thrown above, the following tests can not delete this data directory
+      // and thus fail to start MiniDFSCluster.
+      assertTrue("Couldn't restore executable for: " + dirToFail,
+          FileUtil.setExecutable(dirToFail, true));
+    }
+
+    dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
+
+    createFile(new Path("/test2"), 32, (short)2);
+    FsVolumeImpl restoredVolume = getVolume(dn, dirToFail);
+    assertTrue(restoredVolume != null);
+    assertTrue(restoredVolume != failedVolume);
+    // More data has been written to this volume.
+    assertTrue(restoredVolume.getDfsUsed() > used);
+  }
 }

+ 70 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
@@ -29,6 +31,7 @@ import java.net.Socket;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,6 +58,10 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -198,6 +205,69 @@ public class TestDataNodeVolumeFailure {
         " is created and replicated");
   }
 
+  /**
+   * Test that DataStorage and BlockPoolSliceStorage remove the failed volume
+   * after failure.
+   */
+  @Test(timeout=150000)
+  public void testFailedVolumeBeingRemovedFromDataNode()
+      throws InterruptedException, IOException, TimeoutException {
+    Path file1 = new Path("/test1");
+    DFSTestUtil.createFile(fs, file1, 1024, (short) 2, 1L);
+    DFSTestUtil.waitReplication(fs, file1, (short) 2);
+
+    File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
+    assertTrue(FileUtil.setExecutable(dn0Vol1, false));
+    DataNode dn0 = cluster.getDataNodes().get(0);
+    long lastDiskErrorCheck = dn0.getLastDiskErrorCheck();
+    dn0.checkDiskErrorAsync();
+    // Wait checkDiskError thread finish to discover volume failure.
+    while (dn0.getLastDiskErrorCheck() == lastDiskErrorCheck) {
+      Thread.sleep(100);
+    }
+
+    // Verify dn0Vol1 has been completely removed from DN0.
+    // 1. dn0Vol1 is removed from DataStorage.
+    DataStorage storage = dn0.getStorage();
+    assertEquals(1, storage.getNumStorageDirs());
+    for (int i = 0; i < storage.getNumStorageDirs(); i++) {
+      Storage.StorageDirectory sd = storage.getStorageDir(i);
+      assertFalse(sd.getRoot().getAbsolutePath().startsWith(
+          dn0Vol1.getAbsolutePath()
+      ));
+    }
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    BlockPoolSliceStorage bpsStorage = storage.getBPStorage(bpid);
+    assertEquals(1, bpsStorage.getNumStorageDirs());
+    for (int i = 0; i < bpsStorage.getNumStorageDirs(); i++) {
+      Storage.StorageDirectory sd = bpsStorage.getStorageDir(i);
+      assertFalse(sd.getRoot().getAbsolutePath().startsWith(
+          dn0Vol1.getAbsolutePath()
+      ));
+    }
+
+    // 2. dn0Vol1 is removed from FsDataset
+    FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
+    for (FsVolumeSpi volume : data.getVolumes()) {
+      assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
+          dn0Vol1.getAbsoluteFile());
+    }
+
+    // 3. all blocks on dn0Vol1 have been removed.
+    for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
+      assertNotNull(replica.getVolume());
+      assertNotEquals(
+          new File(replica.getVolume().getBasePath()).getAbsoluteFile(),
+          dn0Vol1.getAbsoluteFile());
+    }
+
+    // 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore.
+    String[] dataDirStrs =
+        dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
+    assertEquals(1, dataDirStrs.length);
+    assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
+  }
+
   /**
    * Test that there are under replication blocks after vol failures
    */

+ 19 - 17
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java

@@ -403,23 +403,6 @@ public class TestDataNodeVolumeFailureReporting {
     checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
     checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
-    // Reconfigure each DataNode to remove its failed volumes.
-    reconfigureDataNode(dns.get(0), dn1Vol2);
-    reconfigureDataNode(dns.get(1), dn2Vol2);
-
-    DataNodeTestUtils.triggerHeartbeat(dns.get(0));
-    DataNodeTestUtils.triggerHeartbeat(dns.get(1));
-
-    checkFailuresAtDataNode(dns.get(0), 1, true);
-    checkFailuresAtDataNode(dns.get(1), 1, true);
-
-    // NN sees reduced capacity, but no volume failures.
-    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
-        origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
-    checkAggregateFailuresAtNameNode(true, 0);
-    checkFailuresAtNameNode(dm, dns.get(0), true);
-    checkFailuresAtNameNode(dm, dns.get(1), true);
-
     // Reconfigure again to try to add back the failed volumes.
     reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
     reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
@@ -460,6 +443,25 @@ public class TestDataNodeVolumeFailureReporting {
     checkAggregateFailuresAtNameNode(false, 2);
     checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
     checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+
+    // Replace failed volume with healthy volume and run reconfigure DataNode.
+    // The failed volume information should be cleared.
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, true));
+    assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, true));
+    reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
+    reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
+
+    DataNodeTestUtils.triggerHeartbeat(dns.get(0));
+    DataNodeTestUtils.triggerHeartbeat(dns.get(1));
+
+    checkFailuresAtDataNode(dns.get(0), 1, true);
+    checkFailuresAtDataNode(dns.get(1), 1, true);
+
+    DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
+        origCapacity, WAIT_FOR_HEARTBEATS);
+    checkAggregateFailuresAtNameNode(true, 0);
+    checkFailuresAtNameNode(dm, dns.get(0), true);
+    checkFailuresAtNameNode(dm, dns.get(1), true);
   }
 
   /**

+ 3 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -61,8 +61,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public void removeVolumes(Collection<StorageLocation> volumes) {
-
+  public void removeVolumes(Set<File> volumes, boolean clearFailure) {
   }
 
   @Override
@@ -243,8 +242,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public void checkDataDir() throws DiskErrorException {
-    throw new DiskChecker.DiskErrorException(null);
+  public Set<File> checkDataDir() {
+    return null;
   }
 
   @Override

+ 9 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -195,10 +195,10 @@ public class TestFsDatasetImpl {
     final String[] dataDirs =
         conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
     final String volumePathToRemove = dataDirs[0];
-    List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>();
-    volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
+    Set<File> volumesToRemove = new HashSet<>();
+    volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile());
 
-    dataset.removeVolumes(volumesToRemove);
+    dataset.removeVolumes(volumesToRemove, true);
     int expectedNumVolumes = dataDirs.length - 1;
     assertEquals("The volume has been removed from the volumeList.",
         expectedNumVolumes, dataset.getVolumes().size());
@@ -206,7 +206,7 @@ public class TestFsDatasetImpl {
         expectedNumVolumes, dataset.storageMap.size());
 
     try {
-      dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(),
+      dataset.asyncDiskService.execute(volumesToRemove.iterator().next(),
           new Runnable() {
             @Override
             public void run() {}
@@ -248,8 +248,9 @@ public class TestFsDatasetImpl {
 
     when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
     when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
-    List<StorageLocation> volumesToRemove = Arrays.asList(loc);
-    dataset.removeVolumes(volumesToRemove);
+    Set<File> volumesToRemove = new HashSet<>();
+    volumesToRemove.add(loc.getFile());
+    dataset.removeVolumes(volumesToRemove, true);
     assertEquals(numExistingVolumes, dataset.getVolumes().size());
   }
 
@@ -278,12 +279,13 @@ public class TestFsDatasetImpl {
     final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
     final FsVolumeReference newRef = mock(FsVolumeReference.class);
     when(newRef.getVolume()).thenReturn(newVolume);
+    when(newVolume.getBasePath()).thenReturn("data4");
     FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
     doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocationOnMock)
           throws Throwable {
-        volumeList.removeVolume(new File("data4"));
+        volumeList.removeVolume(new File("data4"), false);
         volumeList.addVolume(newRef);
         return null;
       }