Browse Source

HDFS-5589. Namenode loops caching and uncaching when data should be uncached. (awang via cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555996 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 11 years ago
parent
commit
8deb7a6057

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -598,6 +598,9 @@ Trunk (Unreleased)
 
     HDFS-5667. Include DatanodeStorage in StorageReport. (Arpit Agarwal)
 
+    HDFS-5589. Namenode loops caching and uncaching when data should be
+    uncached (awang via cmccabe)
+
 Release 2.4.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 147 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

@@ -21,12 +21,14 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -76,7 +78,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
   /**
    * Pseudorandom number source
    */
-  private final Random random = new Random();
+  private static final Random random = new Random();
 
   /**
    * The interval at which we scan the namesystem for caching changes.
@@ -310,8 +312,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
     FSDirectory fsDir = namesystem.getFSDirectory();
     final long now = new Date().getTime();
     for (CacheDirective directive : cacheManager.getCacheDirectives()) {
-      // Reset the directive's statistics
-      directive.resetStatistics();
       // Skip processing this entry if it has expired
       if (LOG.isTraceEnabled()) {
         LOG.trace("Directive expiry is at " + directive.getExpiryTime());
@@ -461,7 +461,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       // there may be a period of time when incomplete blocks remain cached
       // on the DataNodes.
       return "not complete";
-    }  else if (cblock.getReplication() == 0) {
+    } else if (cblock.getReplication() == 0) {
       // Since 0 is not a valid value for a cache directive's replication
       // field, seeing a replication of 0 on a CacheBlock means that it
       // has never been reached by any sweep.
@@ -469,6 +469,9 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
     } else if (cblock.getMark() != mark) { 
       // Although the block was needed in the past, we didn't reach it during
       // the current sweep.  Therefore, it doesn't need to be cached any more.
+      // Need to set the replication to 0 so it doesn't flip back to cached
+      // when the mark flips on the next scan
+      cblock.setReplicationAndMark((short)0, mark);
       return "no longer needed by any directives";
     }
     return null;
@@ -595,7 +598,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    * @param pendingCached    A list of DataNodes that will soon cache the
    *                         block.
    */
-  private void addNewPendingCached(int neededCached,
+  private void addNewPendingCached(final int neededCached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       List<DatanodeDescriptor> pendingCached) {
     // To figure out which replicas can be cached, we consult the
@@ -616,35 +619,156 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       }
       return;
     }
-    List<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
+    // Filter the list of replicas to only the valid targets
+    List<DatanodeDescriptor> possibilities =
+        new LinkedList<DatanodeDescriptor>();
     int numReplicas = blockInfo.getCapacity();
     Collection<DatanodeDescriptor> corrupt =
         blockManager.getCorruptReplicas(blockInfo);
+    int outOfCapacity = 0;
     for (int i = 0; i < numReplicas; i++) {
       DatanodeDescriptor datanode = blockInfo.getDatanode(i);
-      if ((datanode != null) && 
-          ((!pendingCached.contains(datanode)) &&
-          ((corrupt == null) || (!corrupt.contains(datanode))))) {
-        possibilities.add(datanode);
+      if (datanode == null) {
+        continue;
       }
-    }
-    while (neededCached > 0) {
-      if (possibilities.isEmpty()) {
-        LOG.warn("We need " + neededCached + " more replica(s) than " +
-            "actually exist to provide a cache replication of " +
-            cachedBlock.getReplication() + " for " + cachedBlock);
-        return;
+      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+        continue;
       }
-      DatanodeDescriptor datanode =
-          possibilities.remove(random.nextInt(possibilities.size()));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AddNewPendingCached: datanode " + datanode + 
-            " will now cache block " + cachedBlock);
+      if (corrupt != null && corrupt.contains(datanode)) {
+        continue;
       }
+      if (pendingCached.contains(datanode) || cached.contains(datanode)) {
+        continue;
+      }
+      long pendingCapacity = datanode.getCacheRemaining();
+      // Subtract pending cached blocks from effective capacity
+      Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
+      while (it.hasNext()) {
+        CachedBlock cBlock = it.next();
+        BlockInfo info =
+            blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+        if (info != null) {
+          pendingCapacity -= info.getNumBytes();
+        }
+      }
+      it = datanode.getPendingUncached().iterator();
+      // Add pending uncached blocks from effective capacity
+      while (it.hasNext()) {
+        CachedBlock cBlock = it.next();
+        BlockInfo info =
+            blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+        if (info != null) {
+          pendingCapacity += info.getNumBytes();
+        }
+      }
+      if (pendingCapacity < blockInfo.getNumBytes()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Datanode " + datanode + " is not a valid possibility for"
+              + " block " + blockInfo.getBlockId() + " of size "
+              + blockInfo.getNumBytes() + " bytes, only has "
+              + datanode.getCacheRemaining() + " bytes of cache remaining.");
+        }
+        outOfCapacity++;
+        continue;
+      }
+      possibilities.add(datanode);
+    }
+    List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
+        neededCached, blockManager.getDatanodeManager().getStaleInterval());
+    for (DatanodeDescriptor datanode : chosen) {
       pendingCached.add(datanode);
       boolean added = datanode.getPendingCached().add(cachedBlock);
       assert added;
-      neededCached--;
     }
+    // We were unable to satisfy the requested replication factor
+    if (neededCached > chosen.size()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Only have " +
+            (cachedBlock.getReplication() - neededCached + chosen.size()) +
+            " of " + cachedBlock.getReplication() + " cached replicas for " +
+            cachedBlock + " (" + outOfCapacity + " nodes have insufficient " +
+            "capacity).");
+      }
+    }
+  }
+
+  /**
+   * Chooses datanode locations for caching from a list of valid possibilities.
+   * Non-stale nodes are chosen before stale nodes.
+   * 
+   * @param possibilities List of candidate datanodes
+   * @param neededCached Number of replicas needed
+   * @param staleInterval Age of a stale datanode
+   * @return A list of chosen datanodes
+   */
+  private static List<DatanodeDescriptor> chooseDatanodesForCaching(
+      final List<DatanodeDescriptor> possibilities, final int neededCached,
+      final long staleInterval) {
+    // Make a copy that we can modify
+    List<DatanodeDescriptor> targets =
+        new ArrayList<DatanodeDescriptor>(possibilities);
+    // Selected targets
+    List<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>();
+
+    // Filter out stale datanodes
+    List<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>();
+    Iterator<DatanodeDescriptor> it = targets.iterator();
+    while (it.hasNext()) {
+      DatanodeDescriptor d = it.next();
+      if (d.isStale(staleInterval)) {
+        it.remove();
+        stale.add(d);
+      }
+    }
+    // Select targets
+    while (chosen.size() < neededCached) {
+      // Try to use stale nodes if we're out of non-stale nodes, else we're done
+      if (targets.isEmpty()) {
+        if (!stale.isEmpty()) {
+          targets = stale;
+        } else {
+          break;
+        }
+      }
+      // Select a random target
+      DatanodeDescriptor target =
+          chooseRandomDatanodeByRemainingCapacity(targets);
+      chosen.add(target);
+      targets.remove(target);
+    }
+    return chosen;
+  }
+
+  /**
+   * Choose a single datanode from the provided list of possible
+   * targets, weighted by the percentage of free space remaining on the node.
+   * 
+   * @return The chosen datanode
+   */
+  private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(
+      final List<DatanodeDescriptor> targets) {
+    // Use a weighted probability to choose the target datanode
+    float total = 0;
+    for (DatanodeDescriptor d : targets) {
+      total += d.getCacheRemainingPercent();
+    }
+    // Give each datanode a portion of keyspace equal to its relative weight
+    // [0, w1) selects d1, [w1, w2) selects d2, etc.
+    TreeMap<Integer, DatanodeDescriptor> lottery =
+        new TreeMap<Integer, DatanodeDescriptor>();
+    int offset = 0;
+    for (DatanodeDescriptor d : targets) {
+      // Since we're using floats, be paranoid about negative values
+      int weight =
+          Math.max(1, (int)((d.getCacheRemainingPercent() / total) * 1000000));
+      offset += weight;
+      lottery.put(offset, d);
+    }
+    // Choose a number from [0, offset), which is the total amount of weight,
+    // to select the winner
+    DatanodeDescriptor winner =
+        lottery.higherEntry(random.nextInt(offset)).getValue();
+    return winner;
   }
 }

+ 45 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -43,10 +43,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -109,8 +112,9 @@ public class TestFsDatasetCache {
   public void setUp() throws Exception {
     assumeTrue(!Path.WINDOWS);
     conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
-        500);
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
+    conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         CACHE_CAPACITY);
@@ -328,7 +332,7 @@ public class TestFsDatasetCache {
 
     // Create some test files that will exceed total cache capacity
     final int numFiles = 5;
-    final long fileSize = 15000;
+    final long fileSize = CACHE_CAPACITY / (numFiles-1);
 
     final Path[] testFiles = new Path[numFiles];
     final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
@@ -477,4 +481,42 @@ public class TestFsDatasetCache {
     setHeartbeatResponse(uncacheBlocks(locs));
     verifyExpectedCacheUsage(0, 0);
   }
+
+  @Test(timeout=60000)
+  public void testUncacheQuiesces() throws Exception {
+    // Create a file
+    Path fileName = new Path("/testUncacheQuiesces");
+    int fileLen = 4096;
+    DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
+    // Cache it
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+        .setPool("pool").setPath(fileName).setReplication((short)3).build());
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksCached =
+            MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+        return blocksCached > 0;
+      }
+    }, 1000, 30000);
+    // Uncache it
+    dfs.removeCacheDirective(1);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksUncached =
+            MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
+        return blocksUncached > 0;
+      }
+    }, 1000, 30000);
+    // Make sure that no additional messages were sent
+    Thread.sleep(10000);
+    MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+    MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics);
+    MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics);
+  }
 }

+ 52 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java

@@ -57,17 +57,18 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.CachePoolStats;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -81,6 +82,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.GSet;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -603,8 +605,8 @@ public class TestCacheDirectives {
    * Wait for the NameNode to have an expected number of cached blocks
    * and replicas.
    * @param nn NameNode
-   * @param expectedCachedBlocks
-   * @param expectedCachedReplicas
+   * @param expectedCachedBlocks if -1, treat as wildcard
+   * @param expectedCachedReplicas if -1, treat as wildcard
    * @throws Exception
    */
   private static void waitForCachedBlocks(NameNode nn,
@@ -633,16 +635,18 @@ public class TestCacheDirectives {
         } finally {
           namesystem.readUnlock();
         }
-        if ((numCachedBlocks == expectedCachedBlocks) && 
-            (numCachedReplicas == expectedCachedReplicas)) {
-          return true;
-        } else {
-          LOG.info(logString + " cached blocks: have " + numCachedBlocks +
-              " / " + expectedCachedBlocks + ".  " +
-              "cached replicas: have " + numCachedReplicas +
-              " / " + expectedCachedReplicas);
-          return false;
+        if (expectedCachedBlocks == -1 ||
+            numCachedBlocks == expectedCachedBlocks) {
+          if (expectedCachedReplicas == -1 ||
+              numCachedReplicas == expectedCachedReplicas) {
+            return true;
+          }
         }
+        LOG.info(logString + " cached blocks: have " + numCachedBlocks +
+            " / " + expectedCachedBlocks + ".  " +
+            "cached replicas: have " + numCachedReplicas +
+            " / " + expectedCachedReplicas);
+        return false;
       }
     }, 500, 60000);
   }
@@ -1351,4 +1355,39 @@ public class TestCacheDirectives {
         .setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER - 1))
         .build());
   }
+
+  @Test(timeout=60000)
+  public void testExceedsCapacity() throws Exception {
+    // Create a giant file
+    final Path fileName = new Path("/exceeds");
+    final long fileLen = CACHE_CAPACITY * (NUM_DATANODES*2);
+    int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE);
+    DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES,
+        0xFADED);
+    // Set up a log appender watcher
+    final LogVerificationAppender appender = new LogVerificationAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
+        .setPath(fileName).setReplication((short) 1).build());
+    waitForCachedBlocks(namenode, -1, numCachedReplicas,
+        "testExceeds:1");
+    // Check that no DNs saw an excess CACHE message
+    int lines = appender.countLinesWithMessage(
+        "more bytes in the cache: " +
+        DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+    assertEquals("Namenode should not send extra CACHE commands", 0, lines);
+    // Try creating a file with giant-sized blocks that exceed cache capacity
+    dfs.delete(fileName, false);
+    DFSTestUtil.createFile(dfs, fileName, 4096, fileLen, CACHE_CAPACITY * 2,
+        (short) 1, 0xFADED);
+    // Nothing will get cached, so just force sleep for a bit
+    Thread.sleep(4000);
+    // Still should not see any excess commands
+    lines = appender.countLinesWithMessage(
+        "more bytes in the cache: " +
+        DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+    assertEquals("Namenode should not send extra CACHE commands", 0, lines);
+  }
 }