Просмотр исходного кода

HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK. (Contributed by Xiaoyu Yao)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
arp 10 лет назад
Родитель
Сommit
69828a9bf0

+ 24 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,18 +32,34 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public enum StorageType {
-  DISK,
-  SSD,
-  ARCHIVE,
-  RAM_DISK;
+  DISK(false),
+  SSD(false),
+  ARCHIVE(false),
+  RAM_DISK(true);
+
+  private final boolean isTransient;
 
   public static final StorageType DEFAULT = DISK;
-  
+
   public static final StorageType[] EMPTY_ARRAY = {};
-  
+
   private static final StorageType[] VALUES = values();
-  
+
+  StorageType(boolean isTransient) { this.isTransient = isTransient; }
+
+  public boolean isMovable() { return isTransient == false; }
+
   public static List<StorageType> asList() {
     return Arrays.asList(VALUES);
   }
+
+  public static List<StorageType> getMovableTypes() {
+    List<StorageType> movableTypes = new ArrayList<StorageType>();
+    for (StorageType t : VALUES) {
+      if ( t.isTransient == false ) {
+        movableTypes.add(t);
+      }
+    }
+    return movableTypes;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -272,7 +272,7 @@ public class Balancer {
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     for(DatanodeStorageReport r : reports) {
       final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
-      for(StorageType t : StorageType.asList()) {
+      for(StorageType t : StorageType.getMovableTypes()) {
         final Double utilization = policy.getUtilization(r, t);
         if (utilization == null) { // datanode does not have such storage type 
           continue;

+ 23 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java

@@ -69,7 +69,7 @@ public class Mover {
         = new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
     
     private StorageMap() {
-      for(StorageType t : StorageType.asList()) {
+      for(StorageType t : StorageType.getMovableTypes()) {
         targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
       }
     }
@@ -130,7 +130,7 @@ public class Mover {
     final List<DatanodeStorageReport> reports = dispatcher.init();
     for(DatanodeStorageReport r : reports) {
       final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
-      for(StorageType t : StorageType.asList()) {
+      for(StorageType t : StorageType.getMovableTypes()) {
         final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
         final long maxRemaining = getMaxRemaining(r, t);
         final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t,
@@ -348,7 +348,7 @@ public class Mover {
         LocatedBlock lb = lbs.get(i);
         final StorageTypeDiff diff = new StorageTypeDiff(types,
             lb.getStorageTypes());
-        if (!diff.removeOverlap()) {
+        if (!diff.removeOverlap(true)) {
           if (scheduleMoves4Block(diff, lb)) {
             hasRemaining |= (diff.existing.size() > 1 &&
                 diff.expected.size() > 1);
@@ -452,22 +452,38 @@ public class Mover {
       this.expected = new LinkedList<StorageType>(expected);
       this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
     }
-    
+
     /**
      * Remove the overlap between the expected types and the existing types.
-     * @return if the existing types or the expected types is empty after
+     * @param  ignoreNonMovable ignore non-movable storage types
+     *         by removing them from both expected and existing storage type list
+     *         to prevent non-movable storage from being moved.
+     * @returns if the existing types or the expected types is empty after
      *         removing the overlap.
      */
-    boolean removeOverlap() { 
+    boolean removeOverlap(boolean ignoreNonMovable) {
       for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
         final StorageType t = i.next();
         if (expected.remove(t)) {
           i.remove();
         }
       }
+      if (ignoreNonMovable) {
+        removeNonMovable(existing);
+        removeNonMovable(expected);
+      }
       return expected.isEmpty() || existing.isEmpty();
     }
-    
+
+    void removeNonMovable(List<StorageType> types) {
+      for (Iterator<StorageType> i = types.iterator(); i.hasNext(); ) {
+        final StorageType t = i.next();
+        if (!t.isMovable()) {
+          i.remove();
+        }
+      }
+    }
+
     @Override
     public String toString() {
       return getClass().getSimpleName() + "{expected=" + expected

+ 33 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -1456,6 +1456,39 @@ public class DFSTestUtil {
     }
   }
 
+  /**
+   * Helper function that verified blocks of a file are placed on the
+   * expected storage type.
+   *
+   * @param fs The file system containing the the file.
+   * @param client The DFS client used to access the file
+   * @param path name to the file to verify
+   * @param storageType expected storage type
+   * @returns true if file exists and its blocks are located on the expected
+   *            storage type.
+   *          false otherwise.
+   */
+  public static boolean verifyFileReplicasOnStorageType(FileSystem fs,
+    DFSClient client, Path path, StorageType storageType) throws IOException {
+    if (!fs.exists(path)) {
+      LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist");
+      return false;
+    }
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+      client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      if (locatedBlock.getStorageTypes()[0] != storageType) {
+        LOG.info("verifyFileReplicasOnStorageType: for file " + path +
+            ". Expect blk" + locatedBlock +
+          " on Type: " + storageType + ". Actual Type: " +
+          locatedBlock.getStorageTypes()[0]);
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Helper function to create a key in the Key Provider. Defaults
    * to the first indexed NameNode's Key Provider.

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1394,7 +1394,8 @@ public class MiniDFSCluster {
       // Set up datanode address
       setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
       if (manageDfsDirs) {
-        String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
+        String dirs = makeDataNodeDirs(i, storageTypes == null ?
+          null : storageTypes[i - curDatanodesNum]);
         dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
         conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
       }

+ 90 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -41,12 +44,7 @@ import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -57,6 +55,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -86,6 +85,7 @@ public class TestBalancer {
   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
   static final int DEFAULT_BLOCK_SIZE = 100;
+  static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
   private static final Random r = new Random();
 
   static {
@@ -108,6 +108,15 @@ public class TestBalancer {
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
   }
 
+  static void initConfWithRamDisk(Configuration conf) {
+    conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
+    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, 1);
+  }
+
   /* create a file with a length of <code>fileLen</code> */
   static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
       short replicationFactor, int nnIndex)
@@ -1098,6 +1107,81 @@ public class TestBalancer {
         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
   }
 
+  /*
+   * Test Balancer with Ram_Disk configured
+   * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
+   * Then verify that the balancer does not migrate files on RAM_DISK across DN.
+   */
+  @Test(timeout=300000)
+  public void testBalancerWithRamDisk() throws Exception {
+    final int SEED = 0xFADED;
+    final short REPL_FACT = 1;
+    Configuration conf = new Configuration();
+    initConfWithRamDisk(conf);
+
+    final int defaultRamDiskCapacity = 10;
+    final int defaultDiskCapacity = 100;
+    final long ramDiskStorageLimit =
+      ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+      (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+    final long diskStorageLimit =
+      ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+      (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+
+    cluster = new MiniDFSCluster
+      .Builder(conf)
+      .numDataNodes(1)
+      .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
+      .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
+      .build();
+
+    try {
+      cluster.waitActive();
+      // Create few files on RAM_DISK
+      final String METHOD_NAME = GenericTestUtils.getMethodName();
+      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+      final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSClient client = fs.getClient();
+      DFSTestUtil.createFile(fs, path1, true,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+      DFSTestUtil.createFile(fs, path1, true,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+
+      // Sleep for a short time to allow the lazy writer thread to do its job
+      Thread.sleep(6 * 1000);
+
+      // Add another fresh DN with the same type/capacity without files on RAM_DISK
+      StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
+      long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}};
+      cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
+        null, null, storageCapacities, null, false, false, false, null);
+
+      cluster.triggerHeartbeats();
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+      // Run Balancer
+      Balancer.Parameters p = new Balancer.Parameters(
+        Parameters.DEFAULT.policy,
+        Parameters.DEFAULT.threshold,
+        Parameters.DEFAULT.nodesToBeExcluded,
+        Parameters.DEFAULT.nodesToBeIncluded);
+      final int r = Balancer.run(namenodes, p, conf);
+
+      // Validate no RAM_DISK block should be moved
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+
+      // Verify files are still on RAM_DISK
+      DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
+      DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * @param args
    */

+ 92 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java

@@ -32,6 +32,7 @@ import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -67,6 +68,8 @@ import org.junit.Test;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
+
 /**
  * Test the data migration tool (for Archival Storage)
  */
@@ -326,10 +329,10 @@ public class TestStorageMover {
         Assert.assertTrue(fileStatus.getFullName(parent.toString())
             + " with policy " + policy + " has non-empty overlap: " + diff
             + ", the corresponding block is " + lb.getBlock().getLocalBlock(),
-            diff.removeOverlap());
+            diff.removeOverlap(true));
       }
     }
-    
+
     Replication getReplication(Path file) throws IOException {
       return getOrVerifyReplication(file, null);
     }
@@ -397,17 +400,29 @@ public class TestStorageMover {
   }
 
   private static StorageType[][] genStorageTypes(int numDataNodes) {
-    return genStorageTypes(numDataNodes, 0, 0);
+    return genStorageTypes(numDataNodes, 0, 0, 0);
   }
 
   private static StorageType[][] genStorageTypes(int numDataNodes,
       int numAllDisk, int numAllArchive) {
+    return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0);
+  }
+
+  private static StorageType[][] genStorageTypes(int numDataNodes,
+      int numAllDisk, int numAllArchive, int numRamDisk) {
+    Preconditions.checkArgument(
+      (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes);
+
     StorageType[][] types = new StorageType[numDataNodes][];
     int i = 0;
-    for (; i < numAllDisk; i++) {
+    for (; i < numRamDisk; i++)
+    {
+      types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK};
+    }
+    for (; i < numRamDisk + numAllDisk; i++) {
       types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
     }
-    for (; i < numAllDisk + numAllArchive; i++) {
+    for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
       types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
     }
     for (; i < types.length; i++) {
@@ -416,6 +431,26 @@ public class TestStorageMover {
     return types;
   }
 
+  private static long[][] genCapacities(int nDatanodes, int numAllDisk,
+      int numAllArchive, int numRamDisk, long diskCapacity,
+      long archiveCapacity, long ramDiskCapacity) {
+    final long[][] capacities = new long[nDatanodes][];
+    int i = 0;
+    for (; i < numRamDisk; i++) {
+      capacities[i] = new long[]{ramDiskCapacity, diskCapacity};
+    }
+    for (; i < numRamDisk + numAllDisk; i++) {
+      capacities[i] = new long[]{diskCapacity, diskCapacity};
+    }
+    for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
+      capacities[i] = new long[]{archiveCapacity, archiveCapacity};
+    }
+    for(; i < capacities.length; i++) {
+      capacities[i] = new long[]{diskCapacity, archiveCapacity};
+    }
+    return capacities;
+  }
+
   private static class PathPolicyMap {
     final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
     final Path hot = new Path("/hot");
@@ -748,4 +783,56 @@ public class TestStorageMover {
       test.shutdownCluster();
     }
   }
+
+  /**
+   * Test blocks of lazy_persist file on RAM_DISK will not be moved to other
+   * storage types by the Storage Mover.
+   */
+  @Test
+  public void testRamDiskNotMoved() throws Exception {
+    LOG.info("testRamDiskNotMoved");
+    final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
+    final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
+
+    final long diskCapacity = 100 * BLOCK_SIZE;
+    final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
+      * BLOCK_SIZE;
+    final long ramDiskCapacity = 10 * BLOCK_SIZE;
+    final long[][] capacities = genCapacities(1, 0, 0, 1,
+      diskCapacity, archiveCapacity, ramDiskCapacity);
+    final int LAZY_WRITER_INTERVAL_SEC = 1;
+    final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+      1, (short)1, genStorageTypes(1, 0, 0, 1), capacities);
+    clusterScheme.conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+      LAZY_WRITER_INTERVAL_SEC);
+    final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
+
+    try {
+      test.runBasicTest(false);
+
+      // test creating a hot RAM_DISK file
+      final int SEED = 0xFADED;
+      final Path foo_hot = new Path(pathPolicyMap.hot, "foo_hot");
+      DFSTestUtil.createFile(test.dfs, foo_hot, true, BLOCK_SIZE, BLOCK_SIZE,
+        BLOCK_SIZE, (short) 1, SEED, true);
+      Assert.assertTrue(DFSTestUtil.verifyFileReplicasOnStorageType(test.dfs,
+        test.dfs.getClient(), foo_hot, StorageType.RAM_DISK));
+
+     // Sleep for a short time to allow the lazy writer thread to do its job
+      Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+      // Verify policy related name change is allowed
+      final Path foo_hot_new = new Path(pathPolicyMap.warm, "foo_hot");
+      test.dfs.rename(foo_hot, pathPolicyMap.warm);
+      Assert.assertTrue(test.dfs.exists(foo_hot_new));
+
+      // Verify blocks on ram disk will not be moved to other storage types by
+      // policy based Storage Mover.
+      test.migrate();
+      Assert.assertTrue(DFSTestUtil.verifyFileReplicasOnStorageType(test.dfs,
+        test.dfs.getClient(), foo_hot_new, StorageType.RAM_DISK));
+    } finally {
+      test.shutdownCluster();
+    }
+  }
 }