瀏覽代碼

HDFS-10343. BlockManager#createLocatedBlocks may return blocks on failed storages. Contributed by Kuhu Shukla.

(cherry picked from commit 57369a678c4c51627fe9a654e697a906a6bef123)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Kihwal Lee 8 年之前
父節點
當前提交
690ec789f0

+ 10 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -959,16 +960,23 @@ public class BlockManager implements BlockStatsMXBean {
     final int numNodes = blocksMap.numNodes(blk);
     final boolean isCorrupt = numCorruptReplicas == numNodes;
     final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
-    final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
+    DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
     int j = 0;
     if (numMachines > 0) {
       for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
         final DatanodeDescriptor d = storage.getDatanodeDescriptor();
         final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
-        if (isCorrupt || (!replicaCorrupt))
+        if ((isCorrupt || (!replicaCorrupt)) &&
+            storage.getState() != State.FAILED) {
           machines[j++] = storage;
+        }
       }
     }
+
+    if(j < machines.length) {
+      machines = Arrays.copyOf(machines, j);
+    }
+
     assert j == machines.length :
       "isCorrupt: " + isCorrupt + 
       " numMachines: " + numMachines +

+ 78 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -71,8 +71,10 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
@@ -86,6 +88,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
@@ -1084,6 +1087,81 @@ public class TestBlockManager {
     }
   }
 
+  @Test
+  public void testBlockManagerMachinesArray() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+    cluster.waitActive();
+    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+    FileSystem fs = cluster.getFileSystem();
+    final Path filePath = new Path("/tmp.txt");
+    final long fileLen = 1L;
+    DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
+    ArrayList<DataNode> datanodes = cluster.getDataNodes();
+    assertEquals(datanodes.size(), 4);
+    FSNamesystem ns = cluster.getNamesystem();
+    // get the block
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    File storageDir = cluster.getInstanceStorageDir(0, 0);
+    File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
+    assertTrue("Data directory does not exist", dataDir.exists());
+    BlockInfo blockInfo = blockManager.blocksMap.getBlocks().iterator().next();
+    ExtendedBlock blk = new ExtendedBlock(bpid, blockInfo.getBlockId(),
+        blockInfo.getNumBytes(), blockInfo.getGenerationStamp());
+    DatanodeDescriptor failedStorageDataNode =
+        blockManager.getStoredBlock(blockInfo).getDatanode(0);
+    DatanodeDescriptor corruptStorageDataNode =
+        blockManager.getStoredBlock(blockInfo).getDatanode(1);
+
+    ArrayList<StorageReport> reports = new ArrayList<StorageReport>();
+    for(int i=0; i<failedStorageDataNode.getStorageInfos().length; i++) {
+      DatanodeStorageInfo storageInfo = failedStorageDataNode
+          .getStorageInfos()[i];
+      DatanodeStorage dns = new DatanodeStorage(
+          failedStorageDataNode.getStorageInfos()[i].getStorageID(),
+          DatanodeStorage.State.FAILED,
+          failedStorageDataNode.getStorageInfos()[i].getStorageType());
+      while(storageInfo.getBlockIterator().hasNext()) {
+        BlockInfo blockInfo1 = storageInfo.getBlockIterator().next();
+        if(blockInfo1.equals(blockInfo)) {
+          StorageReport report = new StorageReport(
+              dns, true, storageInfo.getCapacity(),
+              storageInfo.getDfsUsed(), storageInfo.getRemaining(),
+              storageInfo.getBlockPoolUsed());
+          reports.add(report);
+          break;
+        }
+      }
+    }
+    failedStorageDataNode.updateHeartbeat(reports.toArray(StorageReport
+        .EMPTY_ARRAY), 0L, 0L, 0, 0, null);
+    ns.writeLock();
+    DatanodeStorageInfo corruptStorageInfo= null;
+    for(int i=0; i<corruptStorageDataNode.getStorageInfos().length; i++) {
+      corruptStorageInfo = corruptStorageDataNode.getStorageInfos()[i];
+      while(corruptStorageInfo.getBlockIterator().hasNext()) {
+        BlockInfo blockInfo1 = corruptStorageInfo.getBlockIterator().next();
+        if (blockInfo1.equals(blockInfo)) {
+          break;
+        }
+      }
+    }
+    blockManager.findAndMarkBlockAsCorrupt(blk, corruptStorageDataNode,
+        corruptStorageInfo.getStorageID(),
+        CorruptReplicasMap.Reason.ANY.toString());
+    ns.writeUnlock();
+    BlockInfo[] blockInfos = new BlockInfo[] {blockInfo};
+    ns.readLock();
+    LocatedBlocks locatedBlocks =
+        blockManager.createLocatedBlocks(blockInfos, 3L, false, 0L, 3L,
+        false, false, null);
+    assertTrue("Located Blocks should exclude corrupt" +
+        "replicas and failed storages",
+        locatedBlocks.getLocatedBlocks().size() == 1);
+    ns.readUnlock();
+  }
+
   @Test
   public void testMetaSaveCorruptBlocks() throws Exception {
     List<DatanodeStorageInfo> origStorages = getStorages(0, 1);