Prechádzať zdrojové kódy

HDFS-10240. Race between close/recoverLease leads to missing block. Contributed by Jinglun, zhouyingchao and Wei-Chiu Chuang.

(cherry picked from commit 865650052b07c8a20d51306202354ac770ed36d5)
Wei-Chiu Chuang 6 rokov pred
rodič
commit
bd46906bd8

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java

@@ -251,6 +251,10 @@ public abstract class BlockInfo extends Block
     return getBlockUCState().equals(BlockUCState.COMPLETE);
   }
 
+  public boolean isUnderRecovery() {
+    return getBlockUCState().equals(BlockUCState.UNDER_RECOVERY);
+  }
+
   public final boolean isCompleteOrCommitted() {
     final BlockUCState state = getBlockUCState();
     return state.equals(BlockUCState.COMPLETE) ||

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -974,6 +974,10 @@ public class BlockManager implements BlockStatsMXBean {
       return false; // no blocks in file yet
     if(lastBlock.isComplete())
       return false; // already completed (e.g. by syncBlock)
+    if(lastBlock.isUnderRecovery()) {
+      throw new IOException("Commit or complete block " + commitBlock +
+          ", whereas it is under recovery.");
+    }
     
     final boolean committed = commitBlock(lastBlock, commitBlock);
     if (committed && lastBlock.isStriped()) {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -682,7 +682,8 @@ class BPServiceActor implements Runnable {
             }
           }
         }
-        if (ibrManager.sendImmediately() || sendHeartbeat) {
+        if (!dn.areIBRDisabledForTests() &&
+            (ibrManager.sendImmediately()|| sendHeartbeat)) {
           ibrManager.sendIBRs(bpNamenode, bpRegistration,
               bpos.getBlockPoolId());
         }

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -331,6 +331,7 @@ public class DataNode extends ReconfigurableBase
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
+  private volatile boolean ibrDisabledForTests = false;
   private volatile boolean cacheReportsDisabledForTests = false;
   private DataStorage storage = null;
 
@@ -1334,6 +1335,15 @@ public class DataNode extends ReconfigurableBase
   }
 
   @VisibleForTesting
+  void setIBRDisabledForTest(boolean disabled) {
+    this.ibrDisabledForTests = disabled;
+  }
+
+  @VisibleForTesting
+  boolean areIBRDisabledForTests() {
+    return this.ibrDisabledForTests;
+  }
+
   void setCacheReportsDisabledForTest(boolean disabled) {
     this.cacheReportsDisabledForTests = disabled;
   }

+ 65 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

@@ -25,6 +25,7 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -163,6 +164,70 @@ public class TestLeaseRecovery2 {
     verifyFile(dfs, filepath1, actual, size);
   }
 
+  @Test
+  public void testCloseWhileRecoverLease() throws Exception {
+    // test recoverLease
+    // set the soft limit to be 1 hour but recoverLease should
+    // close the file immediately
+    cluster.setLeasePeriod(LONG_LEASE_PERIOD, LONG_LEASE_PERIOD);
+    int size = AppendTestUtil.nextInt(FILE_SIZE);
+    String filestr = "/testCloseWhileRecoverLease";
+
+    AppendTestUtil.LOG.info("filestr=" + filestr);
+    Path filepath = new Path(filestr);
+    FSDataOutputStream stm = dfs.create(filepath, true, BUF_SIZE,
+        REPLICATION_NUM, BLOCK_SIZE);
+    assertTrue(dfs.dfs.exists(filestr));
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+
+    // Pause DN block report.
+    // Let client recover lease, and then close the file, and then let DN
+    // report blocks.
+    ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+    for (DataNode dn: dataNodes) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    }
+
+    LOG.info("pause IBR");
+    for (DataNode dn: dataNodes) {
+      DataNodeTestUtils.pauseIBR(dn);
+    }
+
+    AppendTestUtil.LOG.info("size=" + size);
+    stm.write(buffer, 0, size);
+
+    // hflush file
+    AppendTestUtil.LOG.info("hflush");
+    stm.hflush();
+
+    LOG.info("recover lease");
+    dfs.recoverLease(filepath);
+    try {
+      stm.close();
+      fail("close() should fail because the file is under recovery.");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "whereas it is under recovery", ioe);
+    }
+
+    for (DataNode dn: dataNodes) {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    }
+
+    LOG.info("trigger heartbeats");
+    // resume DN block report
+    for (DataNode dn: dataNodes) {
+      DataNodeTestUtils.triggerHeartbeat(dn);
+    }
+
+    stm.close();
+    assertEquals(cluster.getNamesystem().getBlockManager().
+        getMissingBlocksCount(), 0);
+  }
+
   @Test
   public void testLeaseRecoverByAnotherUser() throws Exception {
     byte [] actual = new byte[FILE_SIZE];

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -98,6 +98,9 @@ public class DataNodeTestUtils {
     }
   }
 
+  public static void pauseIBR(DataNode dn) {
+    dn.setIBRDisabledForTest(true);
+  }
   public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
       DataNode dn, DatanodeID datanodeid, final Configuration conf,
       boolean connectToDnViaHostname) throws IOException {