Преглед изворни кода

HDFS-11340. DataNode reconfigure for disks doesn't remove the failed volumes. (Manoj Govindassamy via lei)

Lei Xu пре 8 година
родитељ
комит
ee1d3105c2

+ 56 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -650,49 +650,86 @@ public class DataNode extends ReconfigurableBase
   ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
     Configuration conf = new Configuration();
     conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
-    List<StorageLocation> locations = getStorageLocations(conf);
+    List<StorageLocation> newStorageLocations = getStorageLocations(conf);
 
-    if (locations.isEmpty()) {
+    if (newStorageLocations.isEmpty()) {
       throw new IOException("No directory is specified.");
     }
 
-    // Use the existing StorageLocation to detect storage type changes.
-    Map<String, StorageLocation> existingLocations = new HashMap<>();
+    // Use the existing storage locations from the current conf
+    // to detect new storage additions or removals.
+    Map<String, StorageLocation> existingStorageLocations = new HashMap<>();
     for (StorageLocation loc : getStorageLocations(getConf())) {
-      existingLocations.put(loc.getFile().getCanonicalPath(), loc);
+      existingStorageLocations.put(loc.getFile().getCanonicalPath(), loc);
     }
 
     ChangedVolumes results = new ChangedVolumes();
-    results.newLocations.addAll(locations);
+    results.newLocations.addAll(newStorageLocations);
 
     for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
-         it.hasNext(); ) {
+         it.hasNext();) {
       Storage.StorageDirectory dir = it.next();
       boolean found = false;
-      for (Iterator<StorageLocation> sl = results.newLocations.iterator();
-           sl.hasNext(); ) {
-        StorageLocation location = sl.next();
-        if (location.getFile().getCanonicalPath().equals(
+      for (Iterator<StorageLocation> newLocationItr =
+           results.newLocations.iterator();
+           newLocationItr.hasNext();) {
+        StorageLocation newLocation = newLocationItr.next();
+        if (newLocation.getFile().getCanonicalPath().equals(
             dir.getRoot().getCanonicalPath())) {
-          sl.remove();
-          StorageLocation old = existingLocations.get(
-              location.getFile().getCanonicalPath());
+          StorageLocation old = existingStorageLocations.get(
+              newLocation.getFile().getCanonicalPath());
           if (old != null &&
-              old.getStorageType() != location.getStorageType()) {
+              old.getStorageType() != newLocation.getStorageType()) {
             throw new IOException("Changing storage type is not allowed.");
           }
-          results.unchangedLocations.add(location);
+          // Update the unchanged locations as this location
+          // from the new conf is really not a new one.
+          newLocationItr.remove();
+          results.unchangedLocations.add(newLocation);
           found = true;
           break;
         }
       }
 
+      // New conf doesn't have the storage location which available in
+      // the current storage locations. Add to the deactivateLocations list.
       if (!found) {
+        LOG.info("Deactivation request received for active volume: "
+            + dir.getRoot().toString());
         results.deactivateLocations.add(
             StorageLocation.parse(dir.getRoot().toString()));
       }
     }
 
+    // Use the failed storage locations from the current conf
+    // to detect removals in the new conf.
+    if (getFSDataset().getNumFailedVolumes() > 0) {
+      for (String failedStorageLocation : getFSDataset()
+          .getVolumeFailureSummary().getFailedStorageLocations()) {
+        boolean found = false;
+        for (Iterator<StorageLocation> newLocationItr =
+             results.newLocations.iterator(); newLocationItr.hasNext();) {
+          StorageLocation newLocation = newLocationItr.next();
+          if (newLocation.getFile().getCanonicalPath().equals(
+              failedStorageLocation)) {
+            // The failed storage is being re-added. DataNode#refreshVolumes()
+            // will take care of re-assessing it.
+            found = true;
+            break;
+          }
+        }
+
+        // New conf doesn't have this failed storage location.
+        // Add to the deactivate locations list.
+        if (!found) {
+          LOG.info("Deactivation request received for failed volume: "
+              + failedStorageLocation);
+          results.deactivateLocations.add(StorageLocation.parse(
+              failedStorageLocation));
+        }
+      }
+    }
+
     return results;
   }
 
@@ -715,8 +752,9 @@ public class DataNode extends ReconfigurableBase
     }
 
     try {
-      if (numOldDataDirs + changedVolumes.newLocations.size() -
-          changedVolumes.deactivateLocations.size() <= 0) {
+      if (numOldDataDirs + getFSDataset().getNumFailedVolumes()
+          + changedVolumes.newLocations.size()
+          - changedVolumes.deactivateLocations.size() <= 0) {
         throw new IOException("Attempt to remove all volumes.");
       }
       if (!changedVolumes.newLocations.isEmpty()) {

+ 16 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -523,9 +523,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param clearFailure set true to clear failure information.
    */
   @Override
-  public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
+  public void removeVolumes(Set<File> storageLocsToRemove,
+      boolean clearFailure) {
+    Collection<File> storageLocationsToRemove =
+        new ArrayList<>(storageLocsToRemove);
     // Make sure that all volumes are absolute path.
-    for (File vol : volumesToRemove) {
+    for (File vol : storageLocationsToRemove) {
       Preconditions.checkArgument(vol.isAbsolute(),
           String.format("%s is not absolute path.", vol.getPath()));
     }
@@ -536,7 +539,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
         final File absRoot = sd.getRoot().getAbsoluteFile();
-        if (volumesToRemove.contains(absRoot)) {
+        if (storageLocationsToRemove.contains(absRoot)) {
           LOG.info("Removing " + absRoot + " from FsDataset.");
 
           // Disable the volume from the service.
@@ -563,6 +566,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           }
 
           storageToRemove.add(sd.getStorageUuid());
+          storageLocationsToRemove.remove(absRoot);
+        }
+      }
+
+      // A reconfigure can remove the storage location which is already
+      // removed when the failure was detected by DataNode#checkDiskErrorAsync.
+      // Now, lets remove this from the failed volume list.
+      if (clearFailure) {
+        for (File storageLocToRemove : storageLocationsToRemove) {
+          volumes.removeVolumeFailureInfo(storageLocToRemove);
         }
       }
       setupAsyncLazyPersistThreads();

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -369,8 +369,15 @@ class FsVolumeList {
   }
 
   void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
-    volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
-        volumeFailureInfo);
+    // There could be redundant requests for adding the same failed
+    // volume because of repeated DataNode reconfigure with same list
+    // of volumes. Ignoring update on failed volume so as to preserve
+    // old failed capacity details in the map.
+    if (!volumeFailureInfos.containsKey(volumeFailureInfo
+        .getFailedStorageLocation())) {
+      volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
+          volumeFailureInfo);
+    }
   }
 
   private void addVolumeFailureInfo(FsVolumeImpl vol) {
@@ -380,7 +387,7 @@ class FsVolumeList {
         vol.getCapacity()));
   }
 
-  private void removeVolumeFailureInfo(File vol) {
+  void removeVolumeFailureInfo(File vol) {
     volumeFailureInfos.remove(vol.getAbsolutePath());
   }
 

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import java.io.File;
 import java.io.IOException;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.test.GenericTestUtils;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -228,4 +230,27 @@ public class DataNodeTestUtils {
     }
     return null;
   }
+
+  /**
+   * Call and wait DataNode to detect disk failure.
+   *
+   * @param dn
+   * @param volume
+   * @throws Exception
+   */
+  public static void waitForDiskError(final DataNode dn, FsVolumeSpi volume)
+      throws Exception {
+    LOG.info("Starting to wait for datanode to detect disk failure.");
+    final long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
+    dn.checkDiskErrorAsync(volume);
+    // Wait 10 seconds for checkDiskError thread to finish and discover volume
+    // failures.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+      @Override
+      public Boolean get() {
+        return dn.getLastDiskErrorCheck() != lastDiskErrorCheck;
+      }
+    }, 100, 10000);
+  }
 }

+ 115 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java

@@ -25,12 +25,18 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -47,6 +53,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -396,8 +403,8 @@ public class TestDataNodeVolumeFailureReporting {
     DataNodeTestUtils.triggerHeartbeat(dns.get(0));
     DataNodeTestUtils.triggerHeartbeat(dns.get(1));
 
-    checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
 
     // Ensure we wait a sufficient amount of time.
     assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@@ -405,9 +412,9 @@ public class TestDataNodeVolumeFailureReporting {
     // The NN reports two volume failures again.
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
-    checkAggregateFailuresAtNameNode(false, 2);
-    checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
     // Reconfigure a third time with the failed volumes.  Afterwards, we expect
     // the same volume failures to be reported.  (No double-counting.)
@@ -417,8 +424,8 @@ public class TestDataNodeVolumeFailureReporting {
     DataNodeTestUtils.triggerHeartbeat(dns.get(0));
     DataNodeTestUtils.triggerHeartbeat(dns.get(1));
 
-    checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
 
     // Ensure we wait a sufficient amount of time.
     assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@@ -426,9 +433,9 @@ public class TestDataNodeVolumeFailureReporting {
     // The NN reports two volume failures again.
     DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
         origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
-    checkAggregateFailuresAtNameNode(false, 2);
-    checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
-    checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
+    checkAggregateFailuresAtNameNode(true, 2);
+    checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
+    checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
 
     // Replace failed volume with healthy volume and run reconfigure DataNode.
     // The failed volume information should be cleared.
@@ -503,6 +510,104 @@ public class TestDataNodeVolumeFailureReporting {
         currentVersion.exists());
   }
 
+  /**
+   * Verify DataNode NumFailedVolumes and FailedStorageLocations
+   * after hot swap out of failed volume.
+   */
+  @Test (timeout = 120000)
+  public void testHotSwapOutFailedVolumeAndReporting()
+          throws Exception {
+    LOG.info("Starting testHotSwapOutFailedVolumeAndReporting!");
+    final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
+    final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
+    final DataNode dn0 = cluster.getDataNodes().get(0);
+    final String oldDataDirs = dn0.getConf().get(
+            DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+
+    final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    final ObjectName mxbeanName = new ObjectName(
+        "Hadoop:service=DataNode,name=FSDatasetState-" + dn0.getDatanodeUuid());
+    int numFailedVolumes = (int) mbs.getAttribute(mxbeanName,
+        "NumFailedVolumes");
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 0, false, new String[] {});
+
+    // Fail dn0Vol1 first.
+    // Verify NumFailedVolumes and FailedStorageLocations are empty.
+    DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
+    DataNodeTestUtils.waitForDiskError(dn0,
+        DataNodeTestUtils.getVolume(dn0, dn0Vol1));
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(1, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+            numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 1, true,
+        new String[] {dn0Vol1.getAbsolutePath()});
+
+    // Reconfigure disks without fixing the failed disk.
+    // Verify NumFailedVolumes and FailedStorageLocations haven't changed.
+    try {
+      dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+          oldDataDirs);
+    } catch (ReconfigurationException e) {
+      Assert.assertTrue("Reconfigure exception doesn't have expected path!",
+          e.getCause().getMessage().contains(dn0Vol1.getAbsolutePath()));
+    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return ((int) mbs.getAttribute(mxbeanName,
+              "NumFailedVolumes") == 1);
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 1000, 30000);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 1, true,
+        new String[] {dn0Vol1.getAbsolutePath()});
+
+    // Hot swap out the failed volume.
+    // Verify NumFailedVolumes and FailedStorageLocations are reset.
+    String dataDirs = dn0Vol2.getPath();
+    dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+            dataDirs);
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(0, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+            numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 0, true, new String[] {});
+
+    // Fix failure volume dn0Vol1 and remount it back.
+    // Verify NumFailedVolumes and FailedStorageLocations are empty.
+    DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1);
+    dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+            oldDataDirs);
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(0, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 0, true, new String[] {});
+
+    // Fail dn0Vol2.
+    // Verify NumFailedVolumes and FailedStorageLocations are updated.
+    DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
+    DataNodeTestUtils.waitForDiskError(dn0,
+        DataNodeTestUtils.getVolume(dn0, dn0Vol2));
+    numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
+    Assert.assertEquals(1, numFailedVolumes);
+    Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
+        numFailedVolumes);
+    checkFailuresAtDataNode(dn0, 1, true,
+        new String[] {dn0Vol2.getAbsolutePath()});
+
+    // Verify DataNode tolerating one disk failure.
+    assertTrue(dn0.shouldRun());
+  }
+
   /**
    * Checks the NameNode for correct values of aggregate counters tracking failed
    * volumes across all DataNodes.