瀏覽代碼

HDFS-9070. Allow fsck display pending replica location information for being-written blocks. Contributed by Gao Rui.

Jing Zhao 9 年之前
父節點
當前提交
d806a5bf07

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1546,6 +1546,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy.
     (Brahma Reddy Battula via mingma)
 
+    HDFS-9070. Allow fsck display pending replica location information for
+    being-written blocks. (GAO Rui via jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 94 - 56
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -122,6 +122,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
   public static final String FAILURE_STATUS = "FAILED";
 
   private final NameNode namenode;
+  private final BlockManager blockManager;
   private final NetworkTopology networktopology;
   private final int totalDatanodes;
   private final InetAddress remoteAddress;
@@ -196,6 +197,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       int totalDatanodes, InetAddress remoteAddress) {
     this.conf = conf;
     this.namenode = namenode;
+    this.blockManager = namenode.getNamesystem().getBlockManager();
     this.networktopology = networktopology;
     this.out = out;
     this.totalDatanodes = totalDatanodes;
@@ -249,24 +251,23 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       return;
     }
 
-    BlockManager bm = namenode.getNamesystem().getBlockManager();
     try {
       //get blockInfo
       Block block = new Block(Block.getBlockId(blockId));
       //find which file this block belongs to
-      BlockInfo blockInfo = bm.getStoredBlock(block);
+      BlockInfo blockInfo = blockManager.getStoredBlock(block);
       if(blockInfo == null) {
         out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
         LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
         return;
       }
-      BlockCollection bc = bm.getBlockCollection(blockInfo);
+      BlockCollection bc = blockManager.getBlockCollection(blockInfo);
       INode iNode = (INode) bc;
-      NumberReplicas numberReplicas= bm.countNodes(blockInfo);
+      NumberReplicas numberReplicas= blockManager.countNodes(blockInfo);
       out.println("Block Id: " + blockId);
       out.println("Block belongs to: "+iNode.getFullPathName());
       out.println("No. of Expected Replica: " +
-          bm.getExpectedReplicaNum(blockInfo));
+          blockManager.getExpectedReplicaNum(blockInfo));
       out.println("No. of live Replica: " + numberReplicas.liveReplicas());
       out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
       out.println("No. of stale Replica: " +
@@ -279,8 +280,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
           numberReplicas.corruptReplicas());
       //record datanodes that have corrupted block replica
       Collection<DatanodeDescriptor> corruptionRecord = null;
-      if (bm.getCorruptReplicas(block) != null) {
-        corruptionRecord = bm.getCorruptReplicas(block);
+      if (blockManager.getCorruptReplicas(block) != null) {
+        corruptionRecord = blockManager.getCorruptReplicas(block);
       }
 
       //report block replicas status on datanodes
@@ -289,8 +290,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         out.print("Block replica on datanode/rack: " + dn.getHostName() +
             dn.getNetworkLocation() + " ");
         if (corruptionRecord != null && corruptionRecord.contains(dn)) {
-          out.print(CORRUPT_STATUS+"\t ReasonCode: "+
-            bm.getCorruptReason(block,dn));
+          out.print(CORRUPT_STATUS + "\t ReasonCode: " +
+              blockManager.getCorruptReason(block, dn));
         } else if (dn.isDecommissioned() ){
           out.print(DECOMMISSIONED_STATUS);
         } else if (dn.isDecommissionInProgress()) {
@@ -546,8 +547,63 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     }
   }
 
-  private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
-      LocatedBlocks blocks) throws IOException {
+  /**
+   * Display info of each replica for replication block.
+   * For striped block group, display info of each internal block.
+   */
+  private String getReplicaInfo(BlockInfo storedBlock) {
+    if (!(showLocations || showRacks || showReplicaDetails)) {
+      return "";
+    }
+    final boolean isComplete = storedBlock.isComplete();
+    DatanodeStorageInfo[] storages = isComplete ?
+        blockManager.getStorages(storedBlock) :
+        storedBlock.getUnderConstructionFeature().getExpectedStorageLocations();
+    StringBuilder sb = new StringBuilder(" [");
+
+    for (int i = 0; i < storages.length; i++) {
+      DatanodeStorageInfo storage = storages[i];
+      DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
+      if (showRacks) {
+        sb.append(NodeBase.getPath(dnDesc));
+      } else {
+        sb.append(new DatanodeInfoWithStorage(dnDesc, storage.getStorageID(),
+            storage.getStorageType()));
+      }
+      if (showReplicaDetails) {
+        LightWeightHashSet<BlockInfo> blocksExcess =
+            blockManager.excessReplicateMap.get(dnDesc.getDatanodeUuid());
+        Collection<DatanodeDescriptor> corruptReplicas =
+            blockManager.getCorruptReplicas(storedBlock);
+        sb.append("(");
+        if (dnDesc.isDecommissioned()) {
+          sb.append("DECOMMISSIONED)");
+        } else if (dnDesc.isDecommissionInProgress()) {
+          sb.append("DECOMMISSIONING)");
+        } else if (corruptReplicas != null
+            && corruptReplicas.contains(dnDesc)) {
+          sb.append("CORRUPT)");
+        } else if (blocksExcess != null
+            && blocksExcess.contains(storedBlock)) {
+          sb.append("EXCESS)");
+        } else if (dnDesc.isStale(this.staleInterval)) {
+          sb.append("STALE_NODE)");
+        } else if (storage.areBlockContentsStale()) {
+          sb.append("STALE_BLOCK_CONTENT)");
+        } else {
+          sb.append("LIVE)");
+        }
+      }
+      if (i < storages.length - 1) {
+        sb.append(", ");
+      }
+    }
+    sb.append(']');
+    return sb.toString();
+  }
+
+  private void collectBlocksSummary(String parent, HdfsFileStatus file,
+                 Result res, LocatedBlocks blocks) throws IOException {
     String path = file.getFullName(parent);
     boolean isOpen = blocks.isUnderConstruction();
     if (isOpen && !showOpenFiles) {
@@ -570,13 +626,12 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         // it is under construction
         continue;
       }
-      BlockManager bm = namenode.getNamesystem().getBlockManager();
 
-      final BlockInfo storedBlock = bm.getStoredBlock(
+      final BlockInfo storedBlock = blockManager.getStoredBlock(
           block.getLocalBlock());
-      final int minReplication = bm.getMinStorageNum(storedBlock);
+      final int minReplication = blockManager.getMinStorageNum(storedBlock);
       // count decommissionedReplicas / decommissioningReplicas
-      NumberReplicas numberReplicas = bm.countNodes(storedBlock);
+      NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
       int decommissionedReplicas = numberReplicas.decommissioned();
       int decommissioningReplicas = numberReplicas.decommissioning();
       res.decommissionedReplicas +=  decommissionedReplicas;
@@ -663,7 +718,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
 
       // report
       String blkName = block.toString();
-      report.append(blockNumber + ". " + blkName + " len=" + block.getNumBytes());
+      report.append(blockNumber + ". " + blkName + " len=" +
+          block.getNumBytes());
       if (totalReplicasPerBlock == 0 && !isCorrupt) {
         // If the block is corrupted, it means all its available replicas are
         // corrupted. We don't mark it as missing given these available replicas
@@ -675,52 +731,34 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         missize += block.getNumBytes();
       } else {
         report.append(" Live_repl=" + liveReplicas);
-        if (showLocations || showRacks || showReplicaDetails) {
-          StringBuilder sb = new StringBuilder("[");
-          DatanodeStorageInfo[] storages = bm.getStorages(storedBlock);
-          for (int i = 0; i < storages.length; i++) {
-            DatanodeStorageInfo storage = storages[i];
-            DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
-            if (showRacks) {
-              sb.append(NodeBase.getPath(dnDesc));
-            } else {
-              sb.append(new DatanodeInfoWithStorage(dnDesc, storage.getStorageID(), storage
-                  .getStorageType()));
-            }
-            if (showReplicaDetails) {
-              LightWeightHashSet<BlockInfo> blocksExcess =
-                  bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
-              Collection<DatanodeDescriptor> corruptReplicas =
-                  bm.getCorruptReplicas(block.getLocalBlock());
-              sb.append("(");
-              if (dnDesc.isDecommissioned()) {
-                sb.append("DECOMMISSIONED)");
-              } else if (dnDesc.isDecommissionInProgress()) {
-                sb.append("DECOMMISSIONING)");
-              } else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
-                sb.append("CORRUPT)");
-              } else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
-                sb.append("EXCESS)");
-              } else if (dnDesc.isStale(this.staleInterval)) {
-                sb.append("STALE_NODE)");
-              } else if (storage.areBlockContentsStale()) {
-                sb.append("STALE_BLOCK_CONTENT)");
-              } else {
-                sb.append("LIVE)");
-              }
-            }
-            if (i < storages.length - 1) {
-              sb.append(", ");
-            }
-          }
-          sb.append(']');
-          report.append(" " + sb.toString());
+        String info = getReplicaInfo(storedBlock);
+        if (!info.isEmpty()){
+          report.append(" ").append(info);
         }
       }
       report.append('\n');
       blockNumber++;
     }
 
+    //display under construction block info.
+    if (!blocks.isLastBlockComplete() && lastBlock != null) {
+      ExtendedBlock block = lastBlock.getBlock();
+      String blkName = block.toString();
+      BlockInfo storedBlock = blockManager.getStoredBlock(
+          block.getLocalBlock());
+      DatanodeStorageInfo[] storages = storedBlock
+          .getUnderConstructionFeature().getExpectedStorageLocations();
+      report.append('\n');
+      report.append("Under Construction Block:\n");
+      report.append(blockNumber).append(". ").append(blkName);
+      report.append(" len=").append(block.getNumBytes());
+      report.append(" Expected_repl=" + storages.length);
+      String info=getReplicaInfo(storedBlock);
+      if (!info.isEmpty()){
+        report.append(" ").append(info);
+      }
+    }
+
     // count corrupt file & move or delete if necessary
     if ((missing > 0) || (corrupt > 0)) {
       if (!showFiles) {

+ 75 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -625,9 +625,11 @@ public class TestFsck {
       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
       assertFalse(outStr.contains("OPENFORWRITE")); 
       // Use -openforwrite option to list open files
-      outStr = runFsck(conf, 0, true, topDir, "-openforwrite");
+      outStr = runFsck(conf, 0, true, topDir, "-files", "-blocks",
+          "-locations", "-openforwrite");
       System.out.println(outStr);
       assertTrue(outStr.contains("OPENFORWRITE"));
+      assertTrue(outStr.contains("Under Construction Block:"));
       assertTrue(outStr.contains("openFile"));
       // Close the file
       out.close(); 
@@ -636,6 +638,7 @@ public class TestFsck {
       System.out.println(outStr);
       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
       assertFalse(outStr.contains("OPENFORWRITE"));
+      assertFalse(outStr.contains("Under Construction Block:"));
       util.cleanup(fs, topDir);
       if (fs != null) {try{fs.close();} catch(Exception e){}}
       cluster.shutdown();
@@ -645,6 +648,77 @@ public class TestFsck {
     }
   }
 
+  @Test
+  public void testFsckOpenECFiles() throws Exception {
+    DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsckECFile").
+        setNumFiles(4).build();
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    String outStr;
+    try {
+      Configuration conf = new HdfsConfiguration();
+      conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
+      cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
+      String topDir = "/myDir";
+      byte[] randomBytes = new byte[3000000];
+      int seed = 42;
+      new Random(seed).nextBytes(randomBytes);
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      util.createFiles(fs, topDir);
+
+      // Open a EC file for writing and do not close for now
+      Path openFile = new Path(topDir + "/openECFile");
+      FSDataOutputStream out = fs.create(openFile);
+      int writeCount = 0;
+      while (writeCount != 300) {
+        out.write(randomBytes);
+        writeCount++;
+      }
+
+      // We expect the filesystem to be HEALTHY and show one open file
+      outStr = runFsck(conf, 0, true, openFile.toString(), "-files",
+          "-blocks", "-openforwrite");
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+      assertTrue(outStr.contains("OPENFORWRITE"));
+      assertTrue(outStr.contains("Live_repl=9"));
+      assertTrue(outStr.contains("Expected_repl=9"));
+
+      // Use -openforwrite option to list open files
+      outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
+          "-locations", "-openforwrite", "-replicaDetails");
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+      assertTrue(outStr.contains("OPENFORWRITE"));
+      assertTrue(outStr.contains("Live_repl=9"));
+      assertTrue(outStr.contains("Expected_repl=9"));
+      assertTrue(outStr.contains("Under Construction Block:"));
+
+      // Close the file
+      out.close();
+
+      // Now, fsck should show HEALTHY fs and should not show any open files
+      outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
+          "-locations", "-racks", "-replicaDetails");
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+      assertFalse(outStr.contains("OPENFORWRITE"));
+      assertFalse(outStr.contains("Under Construction Block:"));
+      assertFalse(outStr.contains("Expected_repl=9"));
+      assertTrue(outStr.contains("Live_repl=9"));
+      util.cleanup(fs, topDir);
+    } finally {
+      if (fs != null) {
+        try {
+          fs.close();
+        } catch (Exception e) {
+        }
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   @Test
   public void testCorruptBlock() throws Exception {
     Configuration conf = new HdfsConfiguration();