Kaynağa Gözat

HADOOP-4687. Merge -r 784663:785643 from trunk to branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-4687/hdfs@785794 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 16 yıl önce
ebeveyn
işleme
99965c0243

+ 1 - 0
build.xml

@@ -373,6 +373,7 @@
     <copy file="${test.src.dir}/hdfs/org/apache/hadoop/cli/clitest_data/data30bytes" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/hdfs/org/apache/hadoop/cli/clitest_data/data60bytes" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/hdfs/org/apache/hadoop/cli/clitest_data/data120bytes" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/hdfs/org/apache/hadoop/cli/clitest_data/data1k" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/fsimageV18" todir="${test.cache.data}"/>
     <copy file="${test.src.dir}/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/fsimageV19" todir="${test.cache.data}"/>
   </target>

+ 58 - 26
src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java

@@ -48,16 +48,24 @@ import org.apache.hadoop.security.AccessTokenHandler;
  * methods to be called with lock held on {@link FSNamesystem}.
  */
 public class BlockManager {
+  // Default initial capacity and load factor of map
+  public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;
+  public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
+
   private final FSNamesystem namesystem;
 
-  long pendingReplicationBlocksCount = 0L, corruptReplicaBlocksCount,
-  underReplicatedBlocksCount = 0L, scheduledReplicationBlocksCount = 0L;
+  volatile long pendingReplicationBlocksCount = 0L;
+  volatile long corruptReplicaBlocksCount = 0L;
+  volatile long underReplicatedBlocksCount = 0L;
+  volatile long scheduledReplicationBlocksCount = 0L;
+  volatile long excessBlocksCount = 0L;
+  volatile long pendingDeletionBlocksCount = 0L;
 
   //
   // Mapping: Block -> { INode, datanodes, self ref }
   // Updated only in response to client-sent information.
   //
-  BlocksMap blocksMap = new BlocksMap();
+  final BlocksMap blocksMap;
 
   //
   // Store blocks-->datanodedescriptor(s) map of corrupt replicas
@@ -110,11 +118,17 @@ public class BlockManager {
   ReplicationTargetChooser replicator;
 
   BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
+    this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
+  }
+  
+  BlockManager(FSNamesystem fsn, Configuration conf, int capacity)
+      throws IOException {
     namesystem = fsn;
     pendingReplications = new PendingReplicationBlocks(
         conf.getInt("dfs.replication.pending.timeout.sec",
                     -1) * 1000L);
     setConfigurationParameters(conf);
+    blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR);
   }
 
   void setConfigurationParameters(Configuration conf) throws IOException {
@@ -324,8 +338,11 @@ public class BlockManager {
 
   void removeFromInvalidates(String datanodeId, Block block) {
     Collection<Block> v = recentInvalidateSets.get(datanodeId);
-    if (v != null && v.remove(block) && v.isEmpty()) {
-      recentInvalidateSets.remove(datanodeId);
+    if (v != null && v.remove(block)) {
+      pendingDeletionBlocksCount--;
+      if (v.isEmpty()) {
+        recentInvalidateSets.remove(datanodeId);
+      }
     }
   }
 
@@ -344,6 +361,7 @@ public class BlockManager {
       recentInvalidateSets.put(dn.getStorageID(), invalidateSet);
     }
     if (invalidateSet.add(b)) {
+      pendingDeletionBlocksCount++;
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
           + b.getBlockName() + " is added to invalidSet of " + dn.getName());
     }
@@ -366,7 +384,8 @@ public class BlockManager {
    */
   private void dumpRecentInvalidateSets(PrintWriter out) {
     int size = recentInvalidateSets.values().size();
-    out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
+    out.println("Metasave: Blocks " + pendingDeletionBlocksCount 
+        + " waiting deletion from " + size + " datanodes.");
     if (size == 0) {
       return;
     }
@@ -1101,10 +1120,12 @@ public class BlockManager {
       excessBlocks = new TreeSet<Block>();
       excessReplicateMap.put(dn.getStorageID(), excessBlocks);
     }
-    excessBlocks.add(block);
-    NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
-        + "(" + dn.getName() + ", " + block
-        + ") is added to excessReplicateMap");
+    if (excessBlocks.add(block)) {
+      excessBlocksCount++;
+      NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates:"
+          + " (" + dn.getName() + ", " + block
+          + ") is added to excessReplicateMap");
+    }
   }
 
   /**
@@ -1140,11 +1161,13 @@ public class BlockManager {
       Collection<Block> excessBlocks = excessReplicateMap.get(node
           .getStorageID());
       if (excessBlocks != null) {
-        excessBlocks.remove(block);
-        NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-            + block + " is removed from excessBlocks");
-        if (excessBlocks.size() == 0) {
-          excessReplicateMap.remove(node.getStorageID());
+        if (excessBlocks.remove(block)) {
+          excessBlocksCount--;
+          NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+              + block + " is removed from excessBlocks");
+          if (excessBlocks.size() == 0) {
+            excessReplicateMap.remove(node.getStorageID());
+          }
         }
       }
 
@@ -1243,12 +1266,7 @@ public class BlockManager {
   }
 
   int getActiveBlockCount() {
-    int activeBlocks = blocksMap.size();
-    for(Iterator<Collection<Block>> it =
-          recentInvalidateSets.values().iterator(); it.hasNext();) {
-      activeBlocks -= it.next().size();
-    }
-    return activeBlocks;
+    return blocksMap.size() - (int)pendingDeletionBlocksCount;
   }
 
   DatanodeDescriptor[] getNodes(Block block) {
@@ -1312,8 +1330,11 @@ public class BlockManager {
    * Remove a datanode from the invalidatesSet
    * @param n datanode
    */
-  void removeFromInvalidates(DatanodeInfo n) {
-    recentInvalidateSets.remove(n.getStorageID());
+  void removeFromInvalidates(String storageID) {
+    Collection<Block> blocks = recentInvalidateSets.remove(storageID);
+    if (blocks != null) {
+      pendingDeletionBlocksCount -= blocks.size();
+    }
   }
 
   /**
@@ -1331,7 +1352,7 @@ public class BlockManager {
       assert nodeId != null;
       DatanodeDescriptor dn = namesystem.getDatanode(nodeId);
       if (dn == null) {
-        recentInvalidateSets.remove(nodeId);
+        removeFromInvalidates(nodeId);
         return 0;
       }
 
@@ -1351,8 +1372,9 @@ public class BlockManager {
       }
 
       // If we send everything in this message, remove this node entry
-      if (!it.hasNext())
-        recentInvalidateSets.remove(nodeId);
+      if (!it.hasNext()) {
+        removeFromInvalidates(nodeId);
+      }
 
       dn.addBlocksToBeInvalidated(blocksToInvalidate);
 
@@ -1397,4 +1419,14 @@ public class BlockManager {
   void removeBlockFromMap(BlockInfo blockInfo) {
     blocksMap.removeBlock(blockInfo);
   }
+  
+  public int getCapacity() {
+    synchronized(namesystem) {
+      return blocksMap.getCapacity();
+    }
+  }
+  
+  public float getLoadFactor() {
+    return blocksMap.getLoadFactor();
+  }
 }

+ 28 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -306,7 +305,20 @@ class BlocksMap {
     }
   }
 
-  private Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
+  // Used for tracking HashMap capacity growth
+  private int capacity;
+  private final float loadFactor;
+  
+  private Map<BlockInfo, BlockInfo> map;
+
+  BlocksMap(int initialCapacity, float loadFactor) {
+    this.capacity = 1;
+    // Capacity is initialized to the next multiple of 2 of initialCapacity
+    while (this.capacity < initialCapacity)
+      this.capacity <<= 1;
+    this.loadFactor = loadFactor;
+    this.map = new HashMap<BlockInfo, BlockInfo>(initialCapacity, loadFactor);
+  }
 
   /**
    * Add BlockInfo if mapping does not exist.
@@ -437,4 +449,18 @@ class BlocksMap {
     
     return true;
   }
+  
+  /** Get the capacity of the HashMap that stores blocks */
+  public int getCapacity() {
+    // Capacity doubles every time the map size reaches the threshold
+    while (map.size() > (int)(capacity * loadFactor)) {
+      capacity <<= 1;
+    }
+    return capacity;
+  }
+  
+  /** Get the load factor of the map */
+  public float getLoadFactor() {
+    return loadFactor;
+  }
 }

+ 0 - 8
src/java/org/apache/hadoop/hdfs/server/namenode/CorruptReplicasMap.java

@@ -61,10 +61,6 @@ public class CorruptReplicasMap{
                                    "on " + dn.getName() +
                                    " by " + Server.getRemoteIp());
     }
-    if (NameNode.getNameNodeMetrics() != null) {
-      NameNode.getNameNodeMetrics().numBlocksCorrupted.set(
-        corruptReplicasMap.size());
-    }
   }
 
   /**
@@ -75,10 +71,6 @@ public class CorruptReplicasMap{
   void removeFromCorruptReplicasMap(Block blk) {
     if (corruptReplicasMap != null) {
       corruptReplicasMap.remove(blk);
-      if (NameNode.getNameNodeMetrics() != null) {
-        NameNode.getNameNodeMetrics().numBlocksCorrupted.set(
-          corruptReplicasMap.size());
-      }
     }
   }
 

+ 15 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2146,7 +2146,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
 
   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
     nodeDescr.resetBlocks();
-    blockManager.removeFromInvalidates(nodeDescr);
+    blockManager.removeFromInvalidates(nodeDescr.getStorageID());
     NameNode.stateChangeLog.debug(
                                   "BLOCK* NameSystem.unprotectedRemoveDatanode: "
                                   + nodeDescr.getName() + " is out of service now.");
@@ -2419,7 +2419,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       return new long[] {this.capacityTotal, this.capacityUsed, 
                          this.capacityRemaining,
                          getUnderReplicatedBlocks(),
-                         getCorruptReplicaBlocksCount(),
+                         getCorruptReplicaBlocks(),
                          getMissingBlocksCount()};
     }
   }
@@ -3469,7 +3469,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   }
 
   /** Returns number of blocks with corrupt replicas */
-  public long getCorruptReplicaBlocksCount() {
+  public long getCorruptReplicaBlocks() {
     return blockManager.corruptReplicaBlocksCount;
   }
 
@@ -3477,6 +3477,18 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     return blockManager.scheduledReplicationBlocksCount;
   }
 
+  public long getPendingDeletionBlocks() {
+    return blockManager.pendingDeletionBlocksCount;
+  }
+
+  public long getExcessBlocks() {
+    return blockManager.excessBlocksCount;
+  }
+  
+  public int getBlockCapacity() {
+    return blockManager.getCapacity();
+  }
+
   public String getFSState() {
     return isInSafeMode() ? "safeMode" : "Operational";
   }

+ 3 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -281,11 +281,12 @@ public class LeaseManager {
                ", replaceBy=" + replaceBy);
     }
 
+    final int len = overwrite.length();
     for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(src, sortedLeasesByPath)) {
       final String oldpath = entry.getKey();
       final Lease lease = entry.getValue();
-      final String newpath = oldpath.replaceFirst(
-          java.util.regex.Pattern.quote(overwrite), replaceBy);
+      //overwrite must be a prefix of oldpath
+      final String newpath = replaceBy + oldpath.substring(len);
       if (LOG.isDebugEnabled()) {
         LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
       }

+ 19 - 12
src/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMetrics.java

@@ -43,19 +43,22 @@ import org.apache.hadoop.metrics.util.MetricsRegistry;
 public class FSNamesystemMetrics implements Updater {
   private static Log log = LogFactory.getLog(FSNamesystemMetrics.class);
   private final MetricsRecord metricsRecord;
-  private final MetricsRegistry registry = new MetricsRegistry();
-
+  final MetricsRegistry registry = new MetricsRegistry();
    
-  private final MetricsIntValue filesTotal = new MetricsIntValue("FilesTotal", registry);
-  private final MetricsLongValue blocksTotal = new MetricsLongValue("BlocksTotal", registry);
-  private final MetricsIntValue capacityTotalGB = new MetricsIntValue("CapacityTotalGB", registry);
-  private final MetricsIntValue capacityUsedGB = new MetricsIntValue("CapacityUsedGB", registry);
-  private final MetricsIntValue capacityRemainingGB = new MetricsIntValue("CapacityRemainingGB", registry);
-  private final MetricsIntValue totalLoad = new MetricsIntValue("TotalLoad", registry);
-  private final MetricsIntValue pendingReplicationBlocks = new MetricsIntValue("PendingReplicationBlocks", registry);
-  private final MetricsIntValue underReplicatedBlocks = new MetricsIntValue("UnderReplicatedBlocks", registry);
-  private final MetricsIntValue scheduledReplicationBlocks = new MetricsIntValue("ScheduledReplicationBlocks", registry);
-  private final MetricsIntValue missingBlocks = new MetricsIntValue("MissingBlocks", registry);    
+  final MetricsIntValue filesTotal = new MetricsIntValue("FilesTotal", registry);
+  final MetricsLongValue blocksTotal = new MetricsLongValue("BlocksTotal", registry);
+  final MetricsIntValue capacityTotalGB = new MetricsIntValue("CapacityTotalGB", registry);
+  final MetricsIntValue capacityUsedGB = new MetricsIntValue("CapacityUsedGB", registry);
+  final MetricsIntValue capacityRemainingGB = new MetricsIntValue("CapacityRemainingGB", registry);
+  final MetricsIntValue totalLoad = new MetricsIntValue("TotalLoad", registry);
+  final MetricsIntValue pendingDeletionBlocks = new MetricsIntValue("PendingDeletionBlocks", registry);
+  final MetricsIntValue corruptBlocks = new MetricsIntValue("CorruptBlocks", registry);
+  final MetricsIntValue excessBlocks = new MetricsIntValue("ExcessBlocks", registry);
+  final MetricsIntValue pendingReplicationBlocks = new MetricsIntValue("PendingReplicationBlocks", registry);
+  final MetricsIntValue underReplicatedBlocks = new MetricsIntValue("UnderReplicatedBlocks", registry);
+  final MetricsIntValue scheduledReplicationBlocks = new MetricsIntValue("ScheduledReplicationBlocks", registry);
+  final MetricsIntValue missingBlocks = new MetricsIntValue("MissingBlocks", registry);    
+  final MetricsIntValue blockCapacity = new MetricsIntValue("BlockCapacity", registry);
 
   private final FSNamesystem fsNameSystem;
 
@@ -103,12 +106,16 @@ public class FSNamesystemMetrics implements Updater {
       capacityRemainingGB.set(roundBytesToGBytes(fsNameSystem.
                                                getCapacityRemaining()));
       totalLoad.set(fsNameSystem.getTotalLoad());
+      corruptBlocks.set((int)fsNameSystem.getCorruptReplicaBlocks());
+      excessBlocks.set((int)fsNameSystem.getExcessBlocks());
+      pendingDeletionBlocks.set((int)fsNameSystem.getPendingDeletionBlocks());
       pendingReplicationBlocks.set((int)fsNameSystem.
                                    getPendingReplicationBlocks());
       underReplicatedBlocks.set((int)fsNameSystem.getUnderReplicatedBlocks());
       scheduledReplicationBlocks.set((int)fsNameSystem.
                                       getScheduledReplicationBlocks());
       missingBlocks.set((int)fsNameSystem.getMissingBlocksCount());
+      blockCapacity.set(fsNameSystem.getBlockCapacity());
 
       for (MetricsBase m : registry.getMetricsList()) {
         m.pushMetric(metricsRecord);

+ 11 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java

@@ -82,6 +82,17 @@ public class TestRenameWhileOpen extends junit.framework.TestCase {
       fs.mkdirs(dir3);
       fs.rename(dir1, dir3);
 
+      // create file3
+      Path file3 = new Path(dir3, "file3");
+      FSDataOutputStream stm3 = TestFileCreation.createFile(fs, file3, 1);
+      TestFileCreation.writeFile(stm3);
+      // rename file3 to some bad name
+      try {
+        fs.rename(file3, new Path(dir3, "$ "));
+      } catch(Exception e) {
+        e.printStackTrace();
+      }
+      
       // restart cluster with the same namenode port as before.
       // This ensures that leases are persisted in fsimage.
       cluster.shutdown();

+ 152 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.metrics;
+
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+
+/**
+ * Test for metrics published by the Namenode
+ */
+public class TestNameNodeMetrics extends TestCase {
+  private static final Configuration CONF = new Configuration();
+  static {
+    CONF.setLong("dfs.block.size", 100);
+    CONF.setInt("io.bytes.per.checksum", 1);
+    CONF.setLong("dfs.heartbeat.interval", 1L);
+    CONF.setInt("dfs.replication.interval", 1);
+  }
+  
+  private MiniDFSCluster cluster;
+  private FSNamesystemMetrics metrics;
+  private DistributedFileSystem fs;
+  private Random rand = new Random();
+  private FSNamesystem namesystem;
+
+  @Override
+  protected void setUp() throws Exception {
+    cluster = new MiniDFSCluster(CONF, 3, true, null);
+    cluster.waitActive();
+    namesystem = cluster.getNameNode().getNamesystem();
+    fs = (DistributedFileSystem) cluster.getFileSystem();
+    metrics = namesystem.getFSNamesystemMetrics();
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    cluster.shutdown();
+  }
+  
+  /** create a file with a length of <code>fileLen</code> */
+  private void createFile(String fileName, long fileLen, short replicas) throws IOException {
+    Path filePath = new Path(fileName);
+    DFSTestUtil.createFile(fs, filePath, fileLen, replicas, rand.nextLong());
+  }
+
+  private void updateMetrics() throws Exception {
+    // Wait for metrics update (corresponds to dfs.replication.interval
+    // for some block related metrics to get updated)
+    Thread.sleep(1000);
+    metrics.doUpdates(null);
+  }
+
+  /** Test metrics associated with addition of a file */
+  public void testFileAdd() throws Exception {
+    // Add files with 100 blocks
+    final String file = "/tmp/t";
+    createFile(file, 3200, (short)3);
+    final int blockCount = 32;
+    int blockCapacity = namesystem.getBlockCapacity();
+    updateMetrics();
+    assertEquals(blockCapacity, metrics.blockCapacity.get());
+
+    // Blocks are stored in a hashmap. Compute its capacity, which
+    // doubles every time the number of entries reach the threshold.
+    int threshold = (int)(blockCapacity * BlockManager.DEFAULT_MAP_LOAD_FACTOR);
+    while (threshold < blockCount) {
+      blockCapacity <<= 1;
+    }
+    updateMetrics();
+    assertEquals(3, metrics.filesTotal.get());
+    assertEquals(blockCount, metrics.blocksTotal.get());
+    assertEquals(blockCapacity, metrics.blockCapacity.get());
+    fs.delete(new Path(file), true);
+  }
+  
+  /** Corrupt a block and ensure metrics reflects it */
+  public void testCorruptBlock() throws Exception {
+    // Create a file with single block with two replicas
+    String file = "/tmp/t";
+    createFile(file, 100, (short)2);
+    
+    // Corrupt first replica of the block
+    LocatedBlock block = namesystem.getBlockLocations(file, 0, 1).get(0);
+    namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
+    updateMetrics();
+    assertEquals(1, metrics.corruptBlocks.get());
+    assertEquals(1, metrics.pendingReplicationBlocks.get());
+    assertEquals(1, metrics.scheduledReplicationBlocks.get());
+    fs.delete(new Path(file), true);
+    updateMetrics();
+    assertEquals(0, metrics.corruptBlocks.get());
+    assertEquals(0, metrics.pendingReplicationBlocks.get());
+    assertEquals(0, metrics.scheduledReplicationBlocks.get());
+  }
+  
+  /** Create excess blocks by reducing the replication factor for
+   * for a file and ensure metrics reflects it
+   */
+  public void testExcessBlocks() throws Exception {
+    String file = "/tmp/t";
+    createFile(file, 100, (short)2);
+    int totalBlocks = 1;
+    namesystem.setReplication(file, (short)1);
+    updateMetrics();
+    assertEquals(totalBlocks, metrics.excessBlocks.get());
+    assertEquals(totalBlocks, metrics.pendingDeletionBlocks.get());
+    fs.delete(new Path(file), true);
+  }
+  
+  /** Test to ensure metrics reflects missing blocks */
+  public void testMissingBlock() throws Exception {
+    // Create a file with single block with two replicas
+    String file = "/tmp/t";
+    createFile(file, 100, (short)1);
+    
+    // Corrupt the only replica of the block to result in a missing block
+    LocatedBlock block = namesystem.getBlockLocations(file, 0, 1).get(0);
+    namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
+    updateMetrics();
+    assertEquals(1, metrics.underReplicatedBlocks.get());
+    assertEquals(1, metrics.missingBlocks.get());
+    fs.delete(new Path(file), true);
+    updateMetrics();
+    assertEquals(0, metrics.underReplicatedBlocks.get());
+  }
+}