Jelajahi Sumber

HDFS-11163. Mover should move the file blocks to default storage once policy is unset. Contributed by Surendra Singh Lilhore.

(cherry picked from commit 00ed21a6fedb45a7c8992b8d45adaa83f14af34c)
Chris Nauroth 8 tahun lalu
induk
melakukan
d5e2bd4096

+ 19 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java

@@ -55,6 +55,7 @@ public class FsServerDefaults implements Writable {
   private long trashInterval;
   private DataChecksum.Type checksumType;
   private String keyProviderUri;
+  private byte storagepolicyId;
 
   public FsServerDefaults() {
   }
@@ -62,8 +63,17 @@ public class FsServerDefaults implements Writable {
   public FsServerDefaults(long blockSize, int bytesPerChecksum,
       int writePacketSize, short replication, int fileBufferSize,
       boolean encryptDataTransfer, long trashInterval,
-      DataChecksum.Type checksumType,
-      String keyProviderUri) {
+      DataChecksum.Type checksumType, String keyProviderUri) {
+    this(blockSize, bytesPerChecksum, writePacketSize, replication,
+        fileBufferSize, encryptDataTransfer, trashInterval, checksumType,
+        keyProviderUri, (byte) 0);
+  }
+
+  public FsServerDefaults(long blockSize, int bytesPerChecksum,
+      int writePacketSize, short replication, int fileBufferSize,
+      boolean encryptDataTransfer, long trashInterval,
+      DataChecksum.Type checksumType, String keyProviderUri,
+      byte storagepolicy) {
     this.blockSize = blockSize;
     this.bytesPerChecksum = bytesPerChecksum;
     this.writePacketSize = writePacketSize;
@@ -73,6 +83,7 @@ public class FsServerDefaults implements Writable {
     this.trashInterval = trashInterval;
     this.checksumType = checksumType;
     this.keyProviderUri = keyProviderUri;
+    this.storagepolicyId = storagepolicy;
   }
 
   public long getBlockSize() {
@@ -115,6 +126,10 @@ public class FsServerDefaults implements Writable {
     return keyProviderUri;
   }
 
+  public byte getDefaultStoragePolicyId() {
+    return storagepolicyId;
+  }
+
   // /////////////////////////////////////////
   // Writable
   // /////////////////////////////////////////
@@ -127,6 +142,7 @@ public class FsServerDefaults implements Writable {
     out.writeShort(replication);
     out.writeInt(fileBufferSize);
     WritableUtils.writeEnum(out, checksumType);
+    out.writeByte(storagepolicyId);
   }
 
   @Override
@@ -138,5 +154,6 @@ public class FsServerDefaults implements Writable {
     replication = in.readShort();
     fileBufferSize = in.readInt();
     checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
+    storagepolicyId = in.readByte();
   }
 }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -1619,7 +1619,8 @@ public class PBHelperClient {
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
         convert(fs.getChecksumType()),
-        fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null);
+        fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null,
+        (byte) fs.getPolicyId());
   }
 
   public static List<CryptoProtocolVersionProto> convert(
@@ -1771,6 +1772,7 @@ public class PBHelperClient {
         .setTrashInterval(fs.getTrashInterval())
         .setChecksumType(convert(fs.getChecksumType()))
         .setKeyProviderUri(fs.getKeyProviderUri())
+        .setPolicyId(fs.getDefaultStoragePolicyId())
         .build();
   }
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto

@@ -378,6 +378,7 @@ message FsServerDefaultsProto {
   optional uint64 trashInterval = 7 [default = 0];
   optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
   optional string keyProviderUri = 9;
+  optional uint32 policyId = 10 [default = 0];
 }
 
 

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java

@@ -351,10 +351,15 @@ public class Mover {
     /** @return true if it is necessary to run another round of migration */
     private void processFile(String fullPath, HdfsLocatedFileStatus status,
         Result result) {
-      final byte policyId = status.getStoragePolicy();
-      // currently we ignore files with unspecified storage policy
+      byte policyId = status.getStoragePolicy();
       if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
-        return;
+        try {
+          // get default policy from namenode
+          policyId = dfs.getServerDefaults().getDefaultStoragePolicyId();
+        } catch (IOException e) {
+          LOG.warn("Failed to get default policy for " + fullPath, e);
+          return;
+        }
       }
       final BlockStoragePolicy policy = blockStoragePolicies[policyId];
       if (policy == null) {

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -767,8 +767,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
           checksumType,
           conf.getTrimmed(
-          CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
-          ""));
+              CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+              ""),
+          blockManager.getStoragePolicySuite().getDefaultPolicy().getId());
 
       this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                        DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -171,6 +171,7 @@ public class TestFileCreation {
       assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, serverDefaults.getWritePacketSize());
       assertEquals(DFS_REPLICATION_DEFAULT + 1, serverDefaults.getReplication());
       assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, serverDefaults.getFileBufferSize());
+      assertEquals(7, serverDefaults.getDefaultStoragePolicyId());
     } finally {
       fs.close();
       cluster.shutdown();

+ 74 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java

@@ -22,7 +22,9 @@ import java.net.URI;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -45,9 +47,12 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestMover {
 
+  private static final Logger LOG = LoggerFactory.getLogger(TestMover.class);
   static final int DEFAULT_BLOCK_SIZE = 100;
 
   static {
@@ -409,4 +414,73 @@ public class TestMover {
       cluster.shutdown();
     }
   }
+
+  @Test(timeout = 300000)
+  public void testMoverWhenStoragePolicyUnset() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}})
+        .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String file = "/testMoverWhenStoragePolicyUnset";
+      // write to DISK
+      DFSTestUtil.createFile(dfs, new Path(file), 1L, (short) 1, 0L);
+
+      // move to ARCHIVE
+      dfs.setStoragePolicy(new Path(file), "COLD");
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] {"-p", file.toString()});
+      Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
+
+      // Wait till namenode notified about the block location details
+      waitForLocatedBlockWithArchiveStorageType(dfs, file, 1);
+
+      // verify before unset policy
+      LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      Assert.assertTrue(StorageType.ARCHIVE == (lb.getStorageTypes())[0]);
+
+      // unset storage policy
+      dfs.unsetStoragePolicy(new Path(file));
+      rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] {"-p", file.toString()});
+      Assert.assertEquals("Movement to DISK should be successful", 0, rc);
+
+      lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      Assert.assertTrue(StorageType.DISK == (lb.getStorageTypes())[0]);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void waitForLocatedBlockWithArchiveStorageType(
+      final DistributedFileSystem dfs, final String file,
+      final int expectedArchiveCount) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LocatedBlock lb = null;
+        try {
+          lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+        } catch (IOException e) {
+          LOG.error("Exception while getting located blocks", e);
+          return false;
+        }
+        int archiveCount = 0;
+        for (StorageType storageType : lb.getStorageTypes()) {
+          if (StorageType.ARCHIVE == storageType) {
+            archiveCount++;
+          }
+        }
+        LOG.info("Archive replica count, expected={} and actual={}",
+            expectedArchiveCount, archiveCount);
+        return expectedArchiveCount == archiveCount;
+      }
+    }, 100, 3000);
+  }
+
 }