浏览代码

HDFS-6948. DN rejects blocks if it has older UC block. Contributed by
Eric Payne.
(cherry picked from commit f02d934fedf00f0ce43d6f3f9b06d89ccc6851a5)

Kihwal Lee 10 年之前
父节点
当前提交
2a6c9f0725

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -227,6 +227,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-6970. Move startFile EDEK retries to the DFSClient. (wang)
 
+    HDFS-6948. DN rejects blocks if it has older UC block
+    (Eric Payne via kihwal)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

+ 11 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1090,9 +1090,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ExtendedBlock b) throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
     if (replicaInfo != null) {
-      throw new ReplicaAlreadyExistsException("Block " + b +
-          " already exists in state " + replicaInfo.getState() +
-          " and thus cannot be created.");
+      if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
+          && replicaInfo instanceof ReplicaInPipeline) {
+        // Stop the previous writer
+        ((ReplicaInPipeline)replicaInfo)
+                      .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
+        invalidate(b.getBlockPoolId(), new Block[]{replicaInfo});
+      } else {
+        throw new ReplicaAlreadyExistsException("Block " + b +
+            " already exists in state " + replicaInfo.getState() +
+            " and thus cannot be created.");
+      }
     }
     
     FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());

+ 24 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -111,7 +111,7 @@ public class TestWriteToReplica {
   
   // test writeToTemporary
   @Test
-  public void testWriteToTempoary() throws Exception {
+  public void testWriteToTemporary() throws Exception {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
     try {
       cluster.waitActive();
@@ -475,5 +475,28 @@ public class TestWriteToReplica {
     }
     
     dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+
+    try {
+      dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      Assert.fail("Should not have created a replica that had already been "
+          + "created " + blocks[NON_EXISTENT]);
+    } catch (Exception e) {
+      Assert.assertTrue(
+          e.getMessage().contains(blocks[NON_EXISTENT].getBlockName()));
+      Assert.assertTrue(e instanceof ReplicaAlreadyExistsException);
+    }
+
+    long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
+    blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
+    try {
+      ReplicaInPipeline replicaInfo =
+                dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
+      Assert.assertTrue(
+          replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());
+    } catch (ReplicaAlreadyExistsException e) {
+      Assert.fail("createRbw() Should have removed the block with the older "
+          + "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]);
+    }
   }
 }