Browse Source

HDFS-8492. DN should notify NN when client requests a missing block (Contributed by Walter Su)

Vinayakumar B 8 years ago
parent
commit
1cf6ec4ad4

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -322,6 +323,12 @@ class BlockSender implements java.io.Closeable {
           } else {
             LOG.warn("Could not find metadata file for " + block);
           }
+        } catch (FileNotFoundException e) {
+          // The replica is on its volume map but not on disk
+          datanode.notifyNamenodeDeletedBlock(block, replica.getStorageUuid());
+          datanode.data.invalidate(block.getBlockPoolId(),
+              new Block[]{block.getLocalBlock()});
+          throw e;
         } finally {
           if (!keepMetaInOpen) {
             IOUtils.closeStream(metaIn);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -738,7 +738,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   ReplicaInfo getBlockReplica(String bpid, long blockId) throws IOException {
     ReplicaInfo r = validateBlockFile(bpid, blockId);
     if (r == null) {
-      throw new IOException("BlockId " + blockId + " is not valid.");
+      throw new FileNotFoundException("BlockId " + blockId + " is not valid.");
     }
     return r;
   }

+ 36 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java

@@ -29,16 +29,20 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.HostsFileWriter;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -144,7 +148,37 @@ public class TestDatanodeReport {
       cluster.shutdown();
     }
   }
-  
+
+  @Test
+  public void testDatanodeReportMissingBlock() throws Exception {
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_OF_DATANODES).build();
+    try {
+      // wait until the cluster is up
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path p = new Path("/testDatanodeReportMissingBlock");
+      DFSTestUtil.writeFile(fs, p, new String("testdata"));
+      LocatedBlock lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
+      assertEquals(3, lb.getLocations().length);
+      ExtendedBlock b = lb.getBlock();
+      cluster.corruptBlockOnDataNodesByDeletingBlockFile(b);
+      try {
+        DFSTestUtil.readFile(fs, p);
+        Assert.fail("Must throw exception as the block doesn't exists on disk");
+      } catch (IOException e) {
+        // all bad datanodes
+      }
+      cluster.triggerHeartbeats(); // IBR delete ack
+      lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
+      assertEquals(0, lb.getLocations().length);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   final static Comparator<StorageReport> CMP = new Comparator<StorageReport>() {
     @Override
     public int compare(StorageReport left, StorageReport right) {