فهرست منبع

HDFS-5557. Write pipeline recovery for the last packet in the block may cause rejection of valid replicas. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547173 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 11 سال پیش
والد
کامیت
38a04a3042

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -4060,6 +4060,9 @@ Release 0.23.10 - UNRELEASED
 
     HDFS-5526. Datanode cannot roll back to previous layout version (kihwal)
 
+    HDFS-5557. Write pipeline recovery for the last packet in the block may
+    cause rejection of valid replicas. (kihwal)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -835,7 +835,6 @@ public class DFSOutputStream extends FSOutputSummer
           // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
           // a client waiting on close() will be aware that the flush finished.
           synchronized (dataQueue) {
-            assert dataQueue.size() == 1;
             Packet endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
             assert endOfBlockPacket.lastPacketInBlock;
             assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
@@ -1039,7 +1038,7 @@ public class DFSOutputStream extends FSOutputSummer
         
         // set up the pipeline again with the remaining nodes
         if (failPacket) { // for testing
-          success = createBlockOutputStream(nodes, newGS-1, isRecovery);
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
           failPacket = false;
           try {
             // Give DNs time to send in bad reports. In real situations,

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java

@@ -235,6 +235,8 @@ public class BlockInfoUnderConstruction extends BlockInfo {
    * @param genStamp  The final generation stamp for the block.
    */
   public void setGenerationStampAndVerifyReplicas(long genStamp) {
+    // Set the generation stamp for the block.
+    setGenerationStamp(genStamp);
     if (replicas == null)
       return;
 
@@ -244,12 +246,9 @@ public class BlockInfoUnderConstruction extends BlockInfo {
       if (genStamp != r.getGenerationStamp()) {
         r.getExpectedLocation().removeBlock(this);
         NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
-            + "from location: " + r);
+            + "from location: " + r.getExpectedLocation());
       }
     }
-
-    // Set the generation stamp for the block.
-    setGenerationStamp(genStamp);
   }
 
   /**
@@ -264,6 +263,8 @@ public class BlockInfoUnderConstruction extends BlockInfo {
           + block.getBlockId() + ", expected id = " + getBlockId());
     blockUCState = BlockUCState.COMMITTED;
     this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+    // Sort out invalid replicas.
+    setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
   }
 
   /**

+ 12 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1551,13 +1551,15 @@ public class BlockManager {
    * Besides the block in question, it provides the ReplicaState
    * reported by the datanode in the block report. 
    */
-  private static class StatefulBlockInfo {
+  static class StatefulBlockInfo {
     final BlockInfoUnderConstruction storedBlock;
+    final Block reportedBlock;
     final ReplicaState reportedState;
     
     StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, 
-        ReplicaState reportedState) {
+        Block reportedBlock, ReplicaState reportedState) {
       this.storedBlock = storedBlock;
+      this.reportedBlock = reportedBlock;
       this.reportedState = reportedState;
     }
   }
@@ -1710,7 +1712,7 @@ public class BlockManager {
 
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+      addStoredBlockUnderConstruction(b, node);
     }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
@@ -1934,7 +1936,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
 
     if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
       toUC.add(new StatefulBlockInfo(
-          (BlockInfoUnderConstruction)storedBlock, reportedState));
+          (BlockInfoUnderConstruction)storedBlock, block, reportedState));
       return storedBlock;
     }
 
@@ -2105,13 +2107,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     }
   }
   
-  void addStoredBlockUnderConstruction(
-      BlockInfoUnderConstruction block, 
-      DatanodeDescriptor node, 
-      ReplicaState reportedState) 
-  throws IOException {
-    block.addReplicaIfNotPresent(node, block, reportedState);
-    if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
+  void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
+      DatanodeDescriptor node) throws IOException {
+    BlockInfoUnderConstruction block = ucBlock.storedBlock;
+    block.addReplicaIfNotPresent(node, ucBlock.reportedBlock, ucBlock.reportedState);
+    if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
       addStoredBlock(block, node, null, true);
     }
   }
@@ -2678,7 +2678,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       : "The block should be only in one of the lists.";
 
     for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+      addStoredBlockUnderConstruction(b, node);
     }
     long numBlocksLogged = 0;
     for (BlockInfo b : toAdd) {

+ 3 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -139,16 +139,10 @@ public class TestClientProtocolForPipelineRecovery {
 
       Path file = new Path("dataprotocol1.dat");
       Mockito.when(faultInjector.failPacket()).thenReturn(true);
-      try {
-        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
-      } catch (IOException e) {
-        // completeFile() should fail.
-        Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
-        return;
-      }
+      DFSTestUtil.createFile(fileSys, file, 68000000L, (short)numDataNodes, 0L);
 
-      // At this point, NN let data corruption to happen. 
-      // Before failing test, try reading the file. It should fail.
+      // At this point, NN should have accepted only valid replicas.
+      // Read should succeed.
       FSDataInputStream in = fileSys.open(file);
       try {
         int c = in.read();
@@ -158,8 +152,6 @@ public class TestClientProtocolForPipelineRecovery {
         Assert.fail("Block is missing because the file was closed with"
             + " corrupt replicas.");
       }
-      Assert.fail("The file was closed with corrupt replicas, but read still"
-          + " works!");
     } finally {
       DFSClientFaultInjector.instance = oldInjector;
       if (cluster != null) {

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -1075,8 +1076,8 @@ public class TestReplicationPolicy {
 
     // Adding this block will increase its current replication, and that will
     // remove it from the queue.
-    bm.addStoredBlockUnderConstruction(info,
-        TestReplicationPolicy.dataNodes[0], ReplicaState.FINALIZED);
+    bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
+        ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0]);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.
@@ -1174,4 +1175,4 @@ public class TestReplicationPolicy {
     chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
     assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
   }
-}
+}