Преглед изворни кода

HDFS-6835. Archival Storage: Add a new API to set storage policy. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1617568 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao пре 10 година
родитељ
комит
37207b75d4
17 измењених фајлова са 390 додато и 8 уклоњено
  1. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
  2. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  3. 27 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  4. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  5. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  6. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  7. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  8. 22 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  9. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  10. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  11. 67 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
  12. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
  13. 47 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  14. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  15. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  16. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  17. 94 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java

@@ -68,6 +68,17 @@ public class BlockStoragePolicy {
     public BlockStoragePolicy getDefaultPolicy() {
       return getPolicy(defaultPolicyID);
     }
+
+    public BlockStoragePolicy getPolicy(String policyName) {
+      if (policies != null) {
+        for (BlockStoragePolicy policy : policies) {
+          if (policy != null && policy.name.equals(policyName)) {
+            return policy;
+          }
+        }
+      }
+      return null;
+    }
   }
 
   /** A 4-bit policy ID */
@@ -172,6 +183,10 @@ public class BlockStoragePolicy {
         + ", replicationFallbacks=" + Arrays.asList(replicationFallbacks);
   }
 
+  public byte getId() {
+    return id;
+  }
+
   private static StorageType getFallback(EnumSet<StorageType> unavailables,
       StorageType[] fallbacks) {
     for(StorageType fb : fallbacks) {

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1645,6 +1645,25 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Set storage policy for an existing file
+   * @param src file name
+   * @param policyName name of the storage policy
+   */
+  public void setStoragePolicy(String src, String policyName)
+      throws IOException {
+    try {
+      namenode.setStoragePolicy(src, policyName);
+    } catch (RemoteException e) {
+      throw e.unwrapRemoteException(AccessControlException.class,
+                                    FileNotFoundException.class,
+                                    SafeModeException.class,
+                                    NSQuotaExceededException.class,
+                                    UnresolvedPathException.class,
+                                    SnapshotAccessControlException.class);
+    }
+  }
+
   /**
    * Rename file or directory.
    * @see ClientProtocol#rename(String, String)

+ 27 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -461,7 +461,33 @@ public class DistributedFileSystem extends FileSystem {
       }
     }.resolve(this, absF);
   }
-  
+
+  public void setStoragePolicy(final Path src, final String policyName)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    Path absF = fixRelativePart(src);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.setStoragePolicy(getPathName(p), policyName);
+        return null;
+      }
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          ((DistributedFileSystem) fs).setStoragePolicy(p, policyName);
+          return null;
+        } else {
+          throw new UnsupportedOperationException(
+              "Cannot perform setStoragePolicy on a non-DistributedFileSystem: "
+                  + src + " -> " + p);
+        }
+      }
+    }.resolve(this, absF);
+  }
+
   /**
    * Move blocks from srcs to trg and delete srcs afterwards.
    * The file block sizes must be the same.

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -254,6 +254,20 @@ public interface ClientProtocol {
       FileNotFoundException, SafeModeException, UnresolvedLinkException,
       SnapshotAccessControlException, IOException;
 
+  /**
+   * Set the storage policy for an existing file
+   * @param src Path of an existing file. 
+   * @param policyName The name of the storage policy
+   * @throws SnapshotAccessControlException If access is denied
+   * @throws UnresolvedLinkException if <code>src</code> contains a symlink
+   * @throws FileNotFoundException If file/dir <code>src</code> is not found
+   * @throws QuotaExceededException If changes violate the quota restriction
+   */
+  @Idempotent
+  public void setStoragePolicy(String src, String policyName)
+      throws SnapshotAccessControlException, UnresolvedLinkException,
+      FileNotFoundException, QuotaExceededException, IOException;
+
   /**
    * Set permissions for an existing file/directory.
    * 

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -168,6 +168,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetRep
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
@@ -225,6 +227,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   static final GetSnapshottableDirListingResponseProto 
       NULL_GET_SNAPSHOTTABLE_DIR_LISTING_RESPONSE = 
       GetSnapshottableDirListingResponseProto.newBuilder().build();
+  static final SetStoragePolicyResponseProto VOID_SET_STORAGE_POLICY_RESPONSE =
+      SetStoragePolicyResponseProto.newBuilder().build();
 
   private static final CreateResponseProto VOID_CREATE_RESPONSE = 
   CreateResponseProto.newBuilder().build();
@@ -1354,4 +1358,16 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     }
     return VOID_CHECKACCESS_RESPONSE;
   }
+
+  @Override
+  public SetStoragePolicyResponseProto setStoragePolicy(
+      RpcController controller, SetStoragePolicyRequestProto request)
+      throws ServiceException {
+    try {
+      server.setStoragePolicy(request.getSrc(), request.getPolicyName());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return VOID_SET_STORAGE_POLICY_RESPONSE;
+  }
 }

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -60,7 +60,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
@@ -146,6 +148,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTim
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -1359,4 +1362,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public void setStoragePolicy(String src, String policyName)
+      throws SnapshotAccessControlException, UnresolvedLinkException,
+      FileNotFoundException, QuotaExceededException, IOException {
+    SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto
+        .newBuilder().setSrc(src).setPolicyName(policyName).build();
+    try {
+      rpcProxy.setStoragePolicy(null, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -395,7 +395,11 @@ public class BlockManager {
           lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
     }
   }
-  
+
+  public BlockStoragePolicy getStoragePolicy(final String policyName) {
+    return storagePolicySuite.getPolicy(policyName);
+  }
+
   public void setBlockPoolId(String blockPoolId) {
     if (isBlockTokenEnabled()) {
       blockTokenSecretManager.setBlockPoolId(blockPoolId);

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -946,6 +946,28 @@ public class FSDirectory implements Closeable {
     return file.getBlocks();
   }
 
+  /** Set block storage policy for a file */
+  void setStoragePolicy(String src, byte policyId)
+      throws SnapshotAccessControlException, UnresolvedLinkException,
+      FileNotFoundException, QuotaExceededException {
+    writeLock();
+    try {
+      unprotectedSetStoragePolicy(src, policyId);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  void unprotectedSetStoragePolicy(String src, byte policyId)
+      throws SnapshotAccessControlException, UnresolvedLinkException,
+      FileNotFoundException, QuotaExceededException {
+    assert hasWriteLock();
+    final INodesInPath iip = getINodesInPath4Write(src, true);
+    // TODO: currently we only support setting storage policy on a file
+    final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
+    inode.setStoragePolicyID(policyId, iip.getLatestSnapshotId());
+  }
+
   /**
    * @param path the file path
    * @return the block size of the file. 

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
@@ -812,7 +813,16 @@ public class FSEditLog implements LogsPurgeable {
       .setReplication(replication);
     logEdit(op);
   }
-  
+
+  /** 
+   * Add set storage policy id record to edit log
+   */
+  void logSetStoragePolicy(String src, byte policyId) {
+    SetStoragePolicyOp op = SetStoragePolicyOp.getInstance(cache.get())
+        .setPath(src).setPolicyId(policyId);
+    logEdit(op);
+  }
+
   /** Add set namespace quota record to edit log
    * 
    * @param src the string representation of the path to a directory

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
@@ -827,6 +828,13 @@ public class FSEditLogLoader {
       }
       break;
     }
+    case OP_SET_STORAGE_POLICY: {
+      SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
+      fsDir.unprotectedSetStoragePolicy(
+          renameReservedPathsOnUpgrade(setStoragePolicyOp.path, logVersion),
+          setStoragePolicyOp.policyId);
+      break;
+    }
     default:
       throw new IOException("Invalid operation read " + op.opCode);
     }

+ 67 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -61,6 +61,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -193,6 +194,7 @@ public abstract class FSEditLogOp {
           OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
       inst.put(OP_SET_XATTR, new SetXAttrOp());
       inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
+      inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -3780,6 +3782,71 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setStoragePolicy} */
+  static class SetStoragePolicyOp extends FSEditLogOp {
+    String path;
+    byte policyId;
+
+    private SetStoragePolicyOp() {
+      super(OP_SET_STORAGE_POLICY);
+    }
+
+    static SetStoragePolicyOp getInstance(OpInstanceCache cache) {
+      return (SetStoragePolicyOp) cache.get(OP_SET_STORAGE_POLICY);
+    }
+
+    SetStoragePolicyOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    SetStoragePolicyOp setPolicyId(byte policyId) {
+      this.policyId = policyId;
+      return this;
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(path, out);
+      out.writeByte(policyId);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      this.path = FSImageSerialization.readString(in);
+      this.policyId = in.readByte();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("SetStoragePolicyOp [path=");
+      builder.append(path);
+      builder.append(", policyId=");
+      builder.append(policyId);
+      builder.append(", opCode=");
+      builder.append(opCode);
+      builder.append(", txid=");
+      builder.append(txid);
+      builder.append("]");
+      return builder.toString();
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "PATH", path);
+      XMLUtils.addSaxString(contentHandler, "POLICYID",
+          Byte.valueOf(policyId).toString());
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.path = st.getValue("PATH");
+      this.policyId = Byte.valueOf(st.getValue("POLICYID"));
+    }
+  }  
+
   /**
    * Class for writing editlog ops
    */

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java

@@ -72,6 +72,7 @@ public enum FSEditLogOpCodes {
   OP_ROLLING_UPGRADE_FINALIZE   ((byte) 42),
   OP_SET_XATTR                  ((byte) 43),
   OP_REMOVE_XATTR               ((byte) 44),
+  OP_SET_STORAGE_POLICY         ((byte) 45),
 
   // Note that the current range of the valid OP code is 0~127
   OP_INVALID                    ((byte) -1);

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -154,6 +154,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -2219,6 +2220,52 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return isFile;
   }
 
+  /**
+   * Set the storage policy for an existing file.
+   *
+   * @param src file name
+   * @param policyName storage policy name
+   */
+  void setStoragePolicy(String src, final String policyName)
+      throws IOException {
+    try {
+      setStoragePolicyInt(src, policyName);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setStoragePolicy", src);
+      throw e;
+    }
+  }
+
+  private void setStoragePolicyInt(String src, final String policyName)
+      throws IOException {
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    waitForLoadingFSImage();
+    HdfsFileStatus fileStat;
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot set storage policy for " + src);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+
+      // get the corresponding policy and make sure the policy name is valid
+      BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
+      if (policy == null) {
+        throw new HadoopIllegalArgumentException(
+            "Cannot find a block policy with the name " + policyName);
+      }
+      dir.setStoragePolicy(src, policy.getId());
+      getEditLog().logSetStoragePolicy(src, policy.getId());
+      fileStat = getAuditFileInfo(src, false);
+    } finally {
+      writeUnlock();
+    }
+
+    getEditLog().logSync();
+    logAuditEvent(true, "setStoragePolicy", src, null, fileStat);
+  }
+
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     FSPermissionChecker pc = getPermissionChecker();

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -372,6 +372,17 @@ public class INodeFile extends INodeWithAdditionalFields
     return HeaderFormat.getStoragePolicyID(header);
   }
 
+  /** Set the policy id of the file */
+  public final void setStoragePolicyID(byte policyId) {
+    header = HeaderFormat.STORAGE_POLICY_ID.BITS.combine(policyId, header);
+  }
+
+  public final void setStoragePolicyID(byte policyId, int lastSnapshotId)
+      throws QuotaExceededException {
+    recordModification(lastSnapshotId);
+    setStoragePolicyID(policyId);
+  }
+
   @Override
   public long getHeaderLong() {
     return header;

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -579,7 +579,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
     throws IOException {  
     return namesystem.setReplication(src, replication);
   }
-    
+
+  @Override
+  public void setStoragePolicy(String src, String policyName)
+      throws IOException {
+    namesystem.setStoragePolicy(src, policyName);
+  }
+
   @Override // ClientProtocol
   public void setPermission(String src, FsPermission permissions)
       throws IOException {

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -97,6 +97,14 @@ message SetReplicationResponseProto {
   required bool result = 1;
 }
 
+message SetStoragePolicyRequestProto {
+  required string src = 1;
+  required string policyName = 2;
+}
+
+message SetStoragePolicyResponseProto { // void response
+}
+
 message SetPermissionRequestProto {
   required string src = 1;
   required FsPermissionProto permission = 2;
@@ -671,6 +679,8 @@ service ClientNamenodeProtocol {
   rpc append(AppendRequestProto) returns(AppendResponseProto);
   rpc setReplication(SetReplicationRequestProto)
       returns(SetReplicationResponseProto);
+  rpc setStoragePolicy(SetStoragePolicyRequestProto)
+      returns(SetStoragePolicyResponseProto);
   rpc setPermission(SetPermissionRequestProto)
       returns(SetPermissionResponseProto);
   rpc setOwner(SetOwnerRequestProto) returns(SetOwnerResponseProto);

+ 94 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java

@@ -17,12 +17,19 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.FileNotFoundException;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,8 +37,10 @@ import org.junit.Test;
 public class TestBlockStoragePolicy {
   public static final BlockStoragePolicy.Suite POLICY_SUITE;
   public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY;
+  public static final Configuration conf;
+
   static {
-    final Configuration conf = new HdfsConfiguration();
+    conf = new HdfsConfiguration();
     POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf);
     DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
   }
@@ -41,11 +50,15 @@ public class TestBlockStoragePolicy {
   static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK);
   static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE);
 
+  static final long FILE_LEN = 1024;
+  static final short REPLICATION = 3;
+
+  static final byte COLD = (byte) 4;
+  static final byte WARM = (byte) 8;
+  static final byte HOT  = (byte) 12;
+
   @Test
   public void testDefaultPolicies() throws Exception {
-    final byte COLD = (byte)4; 
-    final byte WARM = (byte)8; 
-    final byte HOT  = (byte)12; 
     final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
     expectedPolicyStrings.put(COLD,
         "BlockStoragePolicy{COLD:4, storageTypes=[ARCHIVE], creationFallbacks=[], replicationFallbacks=[]");
@@ -119,4 +132,81 @@ public class TestBlockStoragePolicy {
     Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk));
     Assert.assertEquals(null, policy.getReplicationFallback(both));
   }
+
+  @Test
+  public void testSetStoragePolicy() throws Exception {
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(REPLICATION).build();
+    cluster.waitActive();
+    FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      final Path dir = new Path("/testSetStoragePolicy");
+      final Path fooFile = new Path(dir, "foo");
+      final Path barDir = new Path(dir, "bar");
+      final Path barFile1= new Path(barDir, "f1");
+      final Path barFile2= new Path(barDir, "f2");
+      DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L);
+      DFSTestUtil.createFile(fs, barFile1, FILE_LEN, REPLICATION, 0L);
+      DFSTestUtil.createFile(fs, barFile2, FILE_LEN, REPLICATION, 0L);
+
+      final String invalidPolicyName = "INVALID-POLICY";
+      try {
+        fs.setStoragePolicy(fooFile, invalidPolicyName);
+        Assert.fail("Should throw a HadoopIllegalArgumentException");
+      } catch (RemoteException e) {
+        GenericTestUtils.assertExceptionContains(invalidPolicyName, e);
+      }
+
+      // check internal status
+      INodeFile fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
+      INodeFile barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
+      INodeFile barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
+
+      final Path invalidPath = new Path("/invalidPath");
+      try {
+        fs.setStoragePolicy(invalidPath, "WARM");
+        Assert.fail("Should throw a FileNotFoundException");
+      } catch (FileNotFoundException e) {
+        GenericTestUtils.assertExceptionContains(invalidPath.toString(), e);
+      }
+
+      fs.setStoragePolicy(fooFile, "COLD");
+      fs.setStoragePolicy(barFile1, "WARM");
+      fs.setStoragePolicy(barFile2, "WARM");
+      // TODO: set storage policy on a directory
+
+      // check internal status
+      Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
+      Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
+      Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
+
+      // restart namenode to make sure the editlog is correct
+      cluster.restartNameNode(true);
+      fsdir = cluster.getNamesystem().getFSDirectory();
+      fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
+      Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
+      barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
+      Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
+      barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
+      Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
+
+      // restart namenode with checkpoint to make sure the fsimage is correct
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      fs.saveNamespace();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+      cluster.restartNameNode(true);
+      fsdir = cluster.getNamesystem().getFSDirectory();
+      fooFileNode = fsdir.getINode4Write(fooFile.toString()).asFile();
+      Assert.assertEquals(COLD, fooFileNode.getStoragePolicyID());
+      barFile1Node = fsdir.getINode4Write(barFile1.toString()).asFile();
+      Assert.assertEquals(WARM, barFile1Node.getStoragePolicyID());
+      barFile2Node = fsdir.getINode4Write(barFile2.toString()).asFile();
+      Assert.assertEquals(WARM, barFile2Node.getStoragePolicyID());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }