瀏覽代碼

HDFS-8646. Prune cached replicas from DatanodeDescriptor state on replica invalidation.

(cherry picked from commit afe9ea3c12e1f5a71922400eadb642960bc87ca1)
Andrew Wang 10 年之前
父節點
當前提交
431f685300

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -613,6 +613,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8542. WebHDFS getHomeDirectory behavior does not match specification.
     (Kanaka Kumar Avvaru via jghoman)
 
+    HDFS-8546. Prune cached replicas from DatanodeDescriptor state on replica
+    invalidation. (wang)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -3096,6 +3097,19 @@ public class BlockManager {
         return;
       }
 
+      CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
+          .get(new CachedBlock(block.getBlockId(), (short) 0, false));
+      if (cblock != null) {
+        boolean removed = false;
+        removed |= node.getPendingCached().remove(cblock);
+        removed |= node.getCached().remove(cblock);
+        removed |= node.getPendingUncached().remove(cblock);
+        if (removed) {
+          blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
+              + "related lists on node {}", block, node);
+        }
+      }
+
       //
       // It's possible that the block was removed because of a datanode
       // failure. If the block is still valid, check if replication is

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -709,8 +709,10 @@ class BPServiceActor implements Runnable {
         }
         processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
 
-        DatanodeCommand cmd = cacheReport();
-        processCommand(new DatanodeCommand[]{ cmd });
+        if (!dn.areCacheReportsDisabledForTests()) {
+          DatanodeCommand cmd = cacheReport();
+          processCommand(new DatanodeCommand[]{ cmd });
+        }
 
         //
         // There is no work to do;  sleep until hearbeat timer elapses, 

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -302,6 +302,7 @@ public class DataNode extends ReconfigurableBase
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
+  private volatile boolean cacheReportsDisabledForTests = false;
   private DataStorage storage = null;
 
   private DatanodeHttpServer httpServer = null;
@@ -1056,15 +1057,27 @@ public class DataNode extends ReconfigurableBase
 
   
   // used only for testing
+  @VisibleForTesting
   void setHeartbeatsDisabledForTests(
       boolean heartbeatsDisabledForTests) {
     this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
   }
-  
+
+  @VisibleForTesting
   boolean areHeartbeatsDisabledForTests() {
     return this.heartbeatsDisabledForTests;
   }
 
+  @VisibleForTesting
+  void setCacheReportsDisabledForTest(boolean disabled) {
+    this.cacheReportsDisabledForTests = disabled;
+  }
+
+  @VisibleForTesting
+  boolean areCacheReportsDisabledForTests() {
+    return this.cacheReportsDisabledForTests;
+  }
+
   /**
    * This method starts the data node with the specified conf.
    * 

+ 21 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
@@ -902,9 +903,26 @@ public final class CacheManager {
     if (cachedBlock == null) {
       return;
     }
-    List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
-    for (DatanodeDescriptor datanode : datanodes) {
-      block.addCachedLoc(datanode);
+    List<DatanodeDescriptor> cachedDNs = cachedBlock.getDatanodes(Type.CACHED);
+    for (DatanodeDescriptor datanode : cachedDNs) {
+      // Filter out cached blocks that do not have a backing replica.
+      //
+      // This should not happen since it means the CacheManager thinks
+      // something is cached that does not exist, but it's a safety
+      // measure.
+      boolean found = false;
+      for (DatanodeInfo loc : block.getLocations()) {
+        if (loc.equals(datanode)) {
+          block.addCachedLoc(loc);
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        LOG.warn("Datanode {} is not a valid cache location for block {} "
+            + "because that node does not have a backing replica!",
+            datanode, block.getBlock().getBlockName());
+      }
     }
   }
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -6455,6 +6455,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     this.dir = dir;
   }
   /** @return the cache manager. */
+  @Override
   public CacheManager getCacheManager() {
     return cacheManager;
   }

+ 10 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java

@@ -29,21 +29,23 @@ import org.apache.hadoop.security.AccessControlException;
 @InterfaceAudience.Private
 public interface Namesystem extends RwLock, SafeMode {
   /** Is this name system running? */
-  public boolean isRunning();
+  boolean isRunning();
 
   /** Check if the user has superuser privilege. */
-  public void checkSuperuserPrivilege() throws AccessControlException;
+  void checkSuperuserPrivilege() throws AccessControlException;
 
   /** @return the block pool ID */
-  public String getBlockPoolId();
+  String getBlockPoolId();
 
-  public boolean isInStandbyState();
+  boolean isInStandbyState();
 
-  public boolean isGenStampInFuture(Block block);
+  boolean isGenStampInFuture(Block block);
 
-  public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
+  void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
 
-  public void checkOperation(OperationCategory read) throws StandbyException;
+  void checkOperation(OperationCategory read) throws StandbyException;
 
-  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
+  boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
+
+  CacheManager getCacheManager();
 }

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FsShell;
@@ -540,6 +541,23 @@ public class DFSTestUtil {
     }
   }
 
+  public static void waitForReplication(final DistributedFileSystem dfs,
+      final Path file, final short replication, int waitForMillis)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          FileStatus stat = dfs.getFileStatus(file);
+          return replication == stat.getReplication();
+        } catch (IOException e) {
+          LOG.info("getFileStatus on path " + file + " failed!", e);
+          return false;
+        }
+      }
+    }, 100, waitForMillis);
+  }
+
   /**
    * Keep accessing the given file until the namenode reports that the
    * given block in the file contains the given number of corrupt replicas.

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -53,6 +54,16 @@ public class DataNodeTestUtils {
     dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
   }
 
+  /**
+   * Set if cache reports are disabled for all DNs in a mini cluster.
+   */
+  public static void setCacheReportsDisabledForTests(MiniDFSCluster cluster,
+      boolean disabled) {
+    for (DataNode dn : cluster.getDataNodes()) {
+      dn.setCacheReportsDisabledForTest(disabled);
+    }
+  }
+
   public static void triggerDeletionReport(DataNode dn) throws IOException {
     for (BPOfferService bpos : dn.getAllBpOs()) {
       bpos.triggerDeletionReportForTests();

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@@ -1453,4 +1454,28 @@ public class TestCacheDirectives {
     Thread.sleep(1000);
     checkPendingCachedEmpty(cluster);
   }
+
+  @Test(timeout=60000)
+  public void testNoBackingReplica() throws Exception {
+    // Cache all three replicas for a file.
+    final Path filename = new Path("/noback");
+    final short replication = (short) 3;
+    DFSTestUtil.createFile(dfs, filename, 1, replication, 0x0BAC);
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    dfs.addCacheDirective(
+        new CacheDirectiveInfo.Builder().setPool("pool").setPath(filename)
+            .setReplication(replication).build());
+    waitForCachedBlocks(namenode, 1, replication, "testNoBackingReplica:1");
+    // Pause cache reports while we change the replication factor.
+    // This will orphan some cached replicas.
+    DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, true);
+    try {
+      dfs.setReplication(filename, (short) 1);
+      DFSTestUtil.waitForReplication(dfs, filename, (short) 1, 30000);
+      // The cache locations should drop down to 1 even without cache reports.
+      waitForCachedBlocks(namenode, 1, (short) 1, "testNoBackingReplica:2");
+    } finally {
+      DataNodeTestUtils.setCacheReportsDisabledForTests(cluster, false);
+    }
+  }
 }