Jelajahi Sumber

HDFS-11499. Decommissioning stuck because of failing recovery. Contributed by Lukas Majercak and Manoj Govindassamy.

(cherry picked from commit 385d2cb777a0272ac20c62336c944fad295d5d12)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Masatake Iwasaki 8 tahun lalu
induk
melakukan
60be2e5d8a

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -801,7 +801,15 @@ public class BlockManager implements BlockStatsMXBean {
       return false; // already completed (e.g. by syncBlock)
     
     final boolean committed = commitBlock(lastBlock, commitBlock);
-    if (countNodes(lastBlock).liveReplicas() >= minReplication) {
+
+    // Count replicas on decommissioning nodes, as these will not be
+    // decommissioned unless recovery/completing last block has finished
+    NumberReplicas numReplicas = countNodes(lastBlock);
+    int numUsableReplicas = numReplicas.liveReplicas() +
+        numReplicas.decommissioning() +
+        numReplicas.liveEnteringMaintenanceReplicas();
+
+    if (numUsableReplicas >= minReplication) {
       if (committed) {
         addExpectedReplicasToPending(lastBlock);
       }

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java

@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -647,6 +648,53 @@ public class TestDecommission extends AdminStatesBaseTest {
 
     fdos.close();
   }
+
+  @Test(timeout = 360000)
+  public void testDecommissionWithOpenFileAndBlockRecovery()
+      throws IOException, InterruptedException {
+    startCluster(1, 6);
+    getCluster().waitActive();
+
+    Path file = new Path("/testRecoveryDecommission");
+
+    // Create a file and never close the output stream to trigger recovery
+    DistributedFileSystem dfs = getCluster().getFileSystem();
+    FSDataOutputStream out = dfs.create(file, true,
+        getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        (short) 3, blockSize);
+
+    // Write data to the file
+    long writtenBytes = 0;
+    while (writtenBytes < fileSize) {
+      out.writeLong(writtenBytes);
+      writtenBytes += 8;
+    }
+    out.hsync();
+
+    DatanodeInfo[] lastBlockLocations = NameNodeAdapter.getBlockLocations(
+      getCluster().getNameNode(), "/testRecoveryDecommission", 0, fileSize)
+      .getLastLocatedBlock().getLocations();
+
+    // Decommission all nodes of the last block
+    ArrayList<String> toDecom = new ArrayList<>();
+    for (DatanodeInfo dnDecom : lastBlockLocations) {
+      toDecom.add(dnDecom.getXferAddr());
+    }
+    initExcludeHosts(toDecom);
+    refreshNodes(0);
+
+    // Make sure hard lease expires to trigger replica recovery
+    getCluster().setLeasePeriod(300L, 300L);
+    Thread.sleep(2 * BLOCKREPORT_INTERVAL_MSEC);
+
+    for (DatanodeInfo dnDecom : lastBlockLocations) {
+      DatanodeInfo datanode = NameNodeAdapter.getDatanode(
+          getCluster().getNamesystem(), dnDecom);
+      waitNodeState(datanode, AdminStates.DECOMMISSIONED);
+    }
+
+    assertEquals(dfs.getFileStatus(file).getLen(), writtenBytes);
+  }
   
   /**
    * Tests restart of namenode while datanode hosts are added to exclude file

+ 51 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java

@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,8 +43,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
@@ -940,6 +944,53 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
     cleanupFile(fileSys, file);
   }
 
+  @Test(timeout = 120000)
+  public void testFileCloseAfterEnteringMaintenance() throws Exception {
+    LOG.info("Starting testFileCloseAfterEnteringMaintenance");
+    int expirationInMs = 30 * 1000;
+    int numDataNodes = 3;
+    int numNameNodes = 1;
+    getConf().setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
+
+    startCluster(numNameNodes, numDataNodes);
+    getCluster().waitActive();
+
+    FSNamesystem fsn = getCluster().getNameNode().getNamesystem();
+    List<String> hosts = new ArrayList<>();
+    for (DataNode dn : getCluster().getDataNodes()) {
+      hosts.add(dn.getDisplayName());
+      putNodeInService(0, dn.getDatanodeUuid());
+    }
+    assertEquals(numDataNodes, fsn.getNumLiveDataNodes());
+
+    Path openFile = new Path("/testClosingFileInMaintenance.dat");
+    // Lets write 2 blocks of data to the openFile
+    writeFile(getCluster().getFileSystem(), openFile, (short) 3);
+
+    // Lets write some more data and keep the file open
+    FSDataOutputStream fsDataOutputStream = getCluster().getFileSystem()
+        .append(openFile);
+    byte[] bytes = new byte[1024];
+    fsDataOutputStream.write(bytes);
+    fsDataOutputStream.hsync();
+
+    LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
+        getCluster().getNameNode(0), openFile.toString(), 0, 3 * blockSize);
+    DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations();
+
+    // Request maintenance for DataNodes 1 and 2 which has the last block.
+    takeNodeOutofService(0,
+        Lists.newArrayList(dnInfos4LastBlock[0].getDatanodeUuid(),
+            dnInfos4LastBlock[1].getDatanodeUuid()),
+        Time.now() + expirationInMs,
+        null, null, AdminStates.ENTERING_MAINTENANCE);
+
+    // Closing the file should succeed even when the
+    // last blocks' nodes are entering maintenance.
+    fsDataOutputStream.close();
+    cleanupFile(getCluster().getFileSystem(), openFile);
+  }
+
   static String getFirstBlockFirstReplicaUuid(FileSystem fileSys,
       Path name) throws IOException {
     DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, name);