Explorar el Código

HDFS-12299. Race Between update pipeline and DN Re-Registration. Contributed by Brahma Reddy Battula.

(cherry picked from commit 5a83ffa396089972e23c533eca33c9cba231c45a)
Kihwal Lee hace 7 años
padre
commit
94bb6fa9df

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -45,6 +45,9 @@ Release 2.7.6 - UNRELEASED
     HDFS-13195. DataNode conf page cannot display the current value after
     reconfig. (Mao Baolong via kihwal)
 
+    HDFS-12299. Race Between update pipeline and DN Re-Registration.
+    (Brahma Reddy Battula. Backport by Wei-Chiu Chuang)
+
 Release 2.7.5 - 2017-12-14
 
   INCOMPATIBLE CHANGES

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -1345,6 +1345,21 @@ public class DFSOutputStream extends FSOutputSummer
       return false; // do not sleep, continue processing
     }
 
+    void updateBlockGS(final long newGS) {
+      block.setGenerationStamp(newGS);
+    }
+
+    /** update pipeline at the namenode */
+    @VisibleForTesting
+    public void updatePipeline(long newGS) throws IOException {
+      final ExtendedBlock oldBlock = block.getCurrentBlock();
+      // the new GS has been propagated to all DN, it should be ok to update the
+      // local block state
+      updateBlockGS(newGS);
+      dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
+          block.getCurrentBlock(), nodes, storageIDs);
+    }
+
     DatanodeInfo[] getExcludedNodes() {
       return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
           .keySet().toArray(new DatanodeInfo[0]);

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java

@@ -193,8 +193,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
     int numLocations = targets == null ? 0 : targets.length;
     this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
     for(int i = 0; i < numLocations; i++)
-      replicas.add(
-        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
+      if (targets[i] != null) {
+        replicas.add(new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
+      }
   }
 
   /**

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -32,6 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
@@ -527,4 +530,51 @@ public class TestClientProtocolForPipelineRecovery {
       }
     }
   }
+
+  @Test
+  public void testUpdatePipeLineAfterDNReg()throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("/testUpdatePipeLineAfterDNReg");
+      FSDataOutputStream out = fileSys.create(file);
+      out.write(1);
+      out.hflush();
+      //Get the First DN and disable the heartbeats and then put in Deadstate
+      DFSOutputStream dfsOut = (DFSOutputStream) out.getWrappedStream();
+      DatanodeInfo[] pipeline = dfsOut.getPipeline();
+      DataNode dn1 = cluster.getDataNode(pipeline[0].getIpcPort());
+      dn1.setHeartbeatsDisabledForTests(true);
+      DatanodeDescriptor dn1Desc = cluster.getNamesystem(0).getBlockManager()
+          .getDatanodeManager().getDatanode(dn1.getDatanodeId());
+      cluster.setDataNodeDead(dn1Desc);
+      //Re-register the DeadNode
+      DatanodeProtocolClientSideTranslatorPB dnp =
+          new DatanodeProtocolClientSideTranslatorPB(
+          cluster.getNameNode().getNameNodeAddress(), conf);
+      dnp.registerDatanode(
+          dn1.getDNRegistrationForBP(cluster.getNamesystem().getBlockPoolId()));
+      DFSOutputStream dfsO = (DFSOutputStream) out.getWrappedStream();
+      String clientName = ((DistributedFileSystem) fileSys).getClient()
+          .getClientName();
+      NamenodeProtocols namenode = cluster.getNameNodeRpc();
+      //Update the genstamp and call updatepipeline
+      LocatedBlock newBlock = namenode
+          .updateBlockForPipeline(dfsO.getBlock(), clientName);
+      dfsO.getStreamer()
+          .updatePipeline(newBlock.getBlock().getGenerationStamp());
+      newBlock = namenode.updateBlockForPipeline(dfsO.getBlock(), clientName);
+      //Should not throw any error Pipeline should be success
+      dfsO.getStreamer()
+          .updatePipeline(newBlock.getBlock().getGenerationStamp());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }