Quellcode durchsuchen

HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol changes. (Contributed by Xiaoyu Yao)

Arpit Agarwal vor 10 Jahren
Ursprung
Commit
8fa0fc9574
14 geänderte Dateien mit 251 neuen und 27 gelöschten Zeilen
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 32 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  3. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  4. 30 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  5. 27 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
  6. 27 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
  7. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  8. 56 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
  9. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  10. 10 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  11. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  12. 32 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  13. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  14. 14 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -27,6 +27,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7655. Expose truncate API for Web HDFS. (yliu)
 
+    HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
+    changes. (Xiaoyu Yao via Arpit Agarwal)
+
   IMPROVEMENTS
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

+ 32 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -168,6 +168,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -3014,7 +3015,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 
   /**
    * Sets or resets quotas for a directory.
-   * @see ClientProtocol#setQuota(String, long, long)
+   * @see ClientProtocol#setQuota(String, long, long, StorageType)
    */
   void setQuota(String src, long namespaceQuota, long diskspaceQuota) 
       throws IOException {
@@ -3030,7 +3031,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
     TraceScope scope = getPathTraceScope("setQuota", src);
     try {
-      namenode.setQuota(src, namespaceQuota, diskspaceQuota);
+      // Pass null as storage type for traditional space/namespace quota.
+      namenode.setQuota(src, namespaceQuota, diskspaceQuota, null);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
@@ -3043,6 +3045,34 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Sets or resets quotas by storage type for a directory.
+   * @see ClientProtocol#setQuota(String, long, long, StorageType)
+   */
+  void setQuotaByStorageType(String src, StorageType type, long spaceQuota)
+      throws IOException {
+    if (spaceQuota <= 0 && spaceQuota != HdfsConstants.QUOTA_DONT_SET &&
+        spaceQuota != HdfsConstants.QUOTA_RESET) {
+      throw new IllegalArgumentException("Invalid values for quota :" +
+        spaceQuota);
+    }
+    if (type == null) {
+      throw new IllegalArgumentException("Invalid storage type(null)");
+    }
+    if (!type.supportTypeQuota()) {
+      throw new IllegalArgumentException("Don't support Quota for storage type : "
+        + type.toString());
+    }
+    try {
+      namenode.setQuota(src, HdfsConstants.QUOTA_DONT_SET, spaceQuota, type);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+        FileNotFoundException.class,
+        QuotaByStorageTypeExceededException.class,
+        UnresolvedPathException.class,
+        SnapshotAccessControlException.class);
+    }
+  }
   /**
    * set the modification and access time of a file
    * 

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -611,6 +611,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled";
   public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true;
 
+  public static final String  DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY = "dfs.quota.by.storage.type.enabled";
+  public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true;
+
   // HA related configuration
   public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
   public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";

+ 30 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -682,7 +682,7 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /** Set a directory's quotas
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long) 
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType)
    */
   public void setQuota(Path src, final long namespaceQuota,
       final long diskspaceQuota) throws IOException {
@@ -704,6 +704,35 @@ public class DistributedFileSystem extends FileSystem {
     }.resolve(this, absF);
   }
 
+  /**
+   * Set the per type storage quota of a directory.
+   *
+   * @param src target directory whose quota is to be modified.
+   * @param type storage type of the specific storage type quota to be modified.
+   * @param spaceQuota value of the specific storage type quota to be modified.
+   * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type.
+   */
+  public void setQuotaByStorageType(
+    Path src, final StorageType type, final long spaceQuota)
+    throws IOException {
+    Path absF = fixRelativePart(src);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+        throws IOException, UnresolvedLinkException {
+        dfs.setQuotaByStorageType(getPathName(p), type, spaceQuota);
+        return null;
+      }
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+        throws IOException {
+        // setQuotaByStorageType is not defined in FileSystem, so we only can resolve
+        // within this DFS
+        return doCall(p);
+      }
+    }.resolve(this, absF);
+  }
+
   private FileStatus[] listStatusInternal(Path p) throws IOException {
     String src = getPathName(p);
 

+ 27 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java

@@ -45,12 +45,18 @@ public enum StorageType {
 
   private static final StorageType[] VALUES = values();
 
-  StorageType(boolean isTransient) { this.isTransient = isTransient; }
+  StorageType(boolean isTransient) {
+    this.isTransient = isTransient;
+  }
 
   public boolean isTransient() {
     return isTransient;
   }
 
+  public boolean supportTypeQuota() {
+    return !isTransient;
+  }
+
   public boolean isMovable() {
     return !isTransient;
   }
@@ -60,12 +66,28 @@ public enum StorageType {
   }
 
   public static List<StorageType> getMovableTypes() {
-    List<StorageType> movableTypes = new ArrayList<StorageType>();
+    return getNonTransientTypes();
+  }
+
+  public static List<StorageType> getTypesSupportingQuota() {
+    return getNonTransientTypes();
+  }
+
+  public static StorageType parseStorageType(int i) {
+    return VALUES[i];
+  }
+
+  public static StorageType parseStorageType(String s) {
+    return StorageType.valueOf(s.toUpperCase());
+  }
+
+  private static List<StorageType> getNonTransientTypes() {
+    List<StorageType> nonTransientTypes = new ArrayList<>();
     for (StorageType t : VALUES) {
       if ( t.isTransient == false ) {
-        movableTypes.add(t);
+        nonTransientTypes.add(t);
       }
     }
-    return movableTypes;
+    return nonTransientTypes;
   }
-}
+}

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@ -116,6 +117,32 @@ public class HdfsAdmin {
   public void clearSpaceQuota(Path src) throws IOException {
     dfs.setQuota(src, HdfsConstants.QUOTA_DONT_SET, HdfsConstants.QUOTA_RESET);
   }
+
+  /**
+   * Set the quota by storage type for a directory. Note that
+   * directories and sym links do not occupy disk space.
+   *
+   * @param src the target directory to set the quota by storage type
+   * @param type the storage type to set for quota by storage type
+   * @param spaceQuota the value to set for quota by storage type
+   * @throws IOException in the event of error
+   */
+  public void setQuotaByStorageType(Path src, StorageType type, long spaceQuota)
+      throws IOException {
+    dfs.setQuotaByStorageType(src, type, spaceQuota);
+  }
+
+  /**
+   * Clear the space quota by storage type for a directory. Note that
+   * directories and sym links do not occupy disk space.
+   *
+   * @param src the target directory to clear the quota by storage type
+   * @param type the storage type to clear for quota by storage type
+   * @throws IOException in the event of error
+   */
+  public void clearQuotaByStorageType(Path src, StorageType type) throws IOException {
+    dfs.setQuotaByStorageType(src, type, HdfsConstants.QUOTA_RESET);
+  }
   
   /**
    * Allow snapshot on a directory.

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.AtMostOnce;
@@ -950,7 +951,9 @@ public interface ClientProtocol {
    * @param namespaceQuota Limit on the number of names in the tree rooted 
    *                       at the directory
    * @param diskspaceQuota Limit on disk space occupied all the files under
-   *                       this directory. 
+   *                       this directory.
+   * @param type StorageType that the space quota is intended to be set on.
+   *             It may be null when called by traditional space/namespace quota.
    * <br><br>
    *                       
    * The quota can have three types of values : (1) 0 or more will set 
@@ -967,8 +970,8 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
-      throws AccessControlException, FileNotFoundException,
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota,
+      StorageType type) throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, SnapshotAccessControlException, IOException;
 
   /**

+ 56 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
+
+import static org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix.long2String;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaByStorageTypeExceededException extends QuotaExceededException {
+  protected static final long serialVersionUID = 1L;
+  protected StorageType type;
+
+  public QuotaByStorageTypeExceededException() {}
+
+  public QuotaByStorageTypeExceededException(String msg) {
+    super(msg);
+  }
+
+  public QuotaByStorageTypeExceededException(long quota, long count, StorageType type) {
+    super(quota, count);
+    this.type = type;
+  }
+
+  @Override
+  public String getMessage() {
+    String msg = super.getMessage();
+    if (msg == null) {
+      return "Quota by storage type : " + type.toString() +
+          " on path : " + (pathName==null ? "": pathName) +
+          " is exceeded. quota = "  + long2String(quota, "B", 2) +
+          " but space consumed = " + long2String(count, "B", 2);
+    } else {
+      return msg;
+    }
+  }
+}

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -881,7 +881,9 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       SetQuotaRequestProto req) throws ServiceException {
     try {
       server.setQuota(req.getPath(), req.getNamespaceQuota(),
-          req.getDiskspaceQuota());
+          req.getDiskspaceQuota(),
+          req.hasStorageType() ?
+          PBHelper.convertStorageType(req.getStorageType()): null);
       return VOID_SETQUOTA_RESPONSE;
     } catch (IOException e) {
       throw new ServiceException(e);

+ 10 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -174,6 +174,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufHelper;
@@ -808,14 +809,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
-  public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota,
+                       StorageType type)
       throws AccessControlException, FileNotFoundException,
       UnresolvedLinkException, IOException {
-    SetQuotaRequestProto req = SetQuotaRequestProto.newBuilder()
+    final SetQuotaRequestProto.Builder builder
+        = SetQuotaRequestProto.newBuilder()
         .setPath(path)
         .setNamespaceQuota(namespaceQuota)
-        .setDiskspaceQuota(diskspaceQuota)
-        .build();
+        .setDiskspaceQuota(diskspaceQuota);
+    if (type != null) {
+      builder.setStorageType(PBHelper.convertStorageType(type));
+    }
+    final SetQuotaRequestProto req = builder.build();
     try {
       rpcProxy.setQuota(null, req);
     } catch (ServiceException e) {

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AclException;
@@ -1184,9 +1185,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
-  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota,
+                       StorageType type)
       throws IOException {
     checkNNStartup();
+    if (type != null) {
+      throw new UnsupportedActionException(
+          "Quota by storage type support is not fully supported by namenode yet.");
+    }
     namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
   }
   

+ 32 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -71,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.ipc.GenericRefreshProtocol;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -173,7 +174,7 @@ public class DFSAdmin extends FsShell {
       "\t\tNote: A quota of 1 would force the directory to remain empty.\n";
 
     private final long quota; // the quota to be set
-    
+
     /** Constructor */
     SetQuotaCommand(String[] args, int pos, FileSystem fs) {
       super(fs);
@@ -206,19 +207,27 @@ public class DFSAdmin extends FsShell {
   /** A class that supports command clearSpaceQuota */
   private static class ClearSpaceQuotaCommand extends DFSAdminCommand {
     private static final String NAME = "clrSpaceQuota";
-    private static final String USAGE = "-"+NAME+" <dirname>...<dirname>";
+    private static final String USAGE = "-"+NAME+" <dirname>...<dirname> -storageType <storagetype>";
     private static final String DESCRIPTION = USAGE + ": " +
     "Clear the disk space quota for each directory <dirName>.\n" +
     "\t\tFor each directory, attempt to clear the quota. An error will be reported if\n" +
     "\t\t1. the directory does not exist or is a file, or\n" +
     "\t\t2. user is not an administrator.\n" +
-    "\t\tIt does not fault if the directory has no quota.";
-    
+    "\t\tIt does not fault if the directory has no quota.\n" +
+    "\t\tThe storage type specific quota is cleared when -storageType option is specified.";
+
+    private StorageType type;
+
     /** Constructor */
     ClearSpaceQuotaCommand(String[] args, int pos, FileSystem fs) {
       super(fs);
       CommandFormat c = new CommandFormat(1, Integer.MAX_VALUE);
       List<String> parameters = c.parse(args, pos);
+      String storageTypeString =
+          StringUtils.popOptionWithArgument("-storageType", parameters);
+      if (storageTypeString != null) {
+        this.type = StorageType.parseStorageType(storageTypeString);
+      }
       this.args = parameters.toArray(new String[parameters.size()]);
     }
     
@@ -238,7 +247,11 @@ public class DFSAdmin extends FsShell {
 
     @Override
     public void run(Path path) throws IOException {
-      dfs.setQuota(path, HdfsConstants.QUOTA_DONT_SET, HdfsConstants.QUOTA_RESET);
+      if (type != null) {
+        dfs.setQuotaByStorageType(path, type, HdfsConstants.QUOTA_RESET);
+      } else {
+        dfs.setQuota(path, HdfsConstants.QUOTA_DONT_SET, HdfsConstants.QUOTA_RESET);
+      }
     }
   }
   
@@ -246,7 +259,7 @@ public class DFSAdmin extends FsShell {
   private static class SetSpaceQuotaCommand extends DFSAdminCommand {
     private static final String NAME = "setSpaceQuota";
     private static final String USAGE =
-      "-"+NAME+" <quota> <dirname>...<dirname>";
+      "-"+NAME+" <quota> <dirname>...<dirname> -storageType <storagetype>";
     private static final String DESCRIPTION = USAGE + ": " +
       "Set the disk space quota <quota> for each directory <dirName>.\n" + 
       "\t\tThe space quota is a long integer that puts a hard limit\n" +
@@ -258,9 +271,11 @@ public class DFSAdmin extends FsShell {
       "\t\tFor each directory, attempt to set the quota. An error will be reported if\n" +
       "\t\t1. N is not a positive integer, or\n" +
       "\t\t2. user is not an administrator, or\n" +
-      "\t\t3. the directory does not exist or is a file, or\n";
+      "\t\t3. the directory does not exist or is a file.\n" +
+      "\t\tThe storage type specific quota is set when -storageType option is specified.\n";
 
     private long quota; // the quota to be set
+    private StorageType type;
     
     /** Constructor */
     SetSpaceQuotaCommand(String[] args, int pos, FileSystem fs) {
@@ -273,6 +288,11 @@ public class DFSAdmin extends FsShell {
       } catch (NumberFormatException nfe) {
         throw new IllegalArgumentException("\"" + str + "\" is not a valid value for a quota.");
       }
+      String storageTypeString =
+          StringUtils.popOptionWithArgument("-storageType", parameters);
+      if (storageTypeString != null) {
+        this.type = StorageType.parseStorageType(storageTypeString);
+      }
       
       this.args = parameters.toArray(new String[parameters.size()]);
     }
@@ -293,7 +313,11 @@ public class DFSAdmin extends FsShell {
 
     @Override
     public void run(Path path) throws IOException {
-      dfs.setQuota(path, HdfsConstants.QUOTA_DONT_SET, quota);
+      if (type != null) {
+        dfs.setQuotaByStorageType(path, type, quota);
+      } else {
+        dfs.setQuota(path, HdfsConstants.QUOTA_DONT_SET, quota);
+      }
     }
   }
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -570,6 +570,7 @@ message SetQuotaRequestProto {
   required string path = 1;
   required uint64 namespaceQuota = 2;
   required uint64 diskspaceQuota = 3;
+  optional StorageTypeProto storageType = 4;
 }
 
 message SetQuotaResponseProto { // void response

+ 14 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml

@@ -15508,6 +15508,10 @@
           <type>RegexpComparator</type>
           <expected-output>^( |\t)*It does not fault if the directory has no quota.( )*</expected-output>
         </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*The storage type specific quota is cleared when -storageType option is specified.( )*</expected-output>
+        </comparator>
       </comparators>
     </test>
 
@@ -15521,7 +15525,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-setSpaceQuota &lt;quota&gt; &lt;dirname&gt;...&lt;dirname&gt;: Set the disk space quota &lt;quota&gt; for each directory &lt;dirName&gt;.( )*</expected-output>
+          <expected-output>^-setSpaceQuota &lt;quota&gt; &lt;dirname&gt;...&lt;dirname&gt; -storageType &lt;storagetype&gt;: Set the disk space quota &lt;quota&gt; for each directory &lt;dirName&gt;.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
@@ -15535,6 +15539,14 @@
           <type>RegexpComparator</type>
           <expected-output>^( |\t)*2. user is not an administrator, or( )*</expected-output>
         </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*3. the directory does not exist or is a file.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^( |\t)*The storage type specific quota is set when -storageType option is specified.( )*</expected-output>
+        </comparator>
       </comparators>
     </test>
 
@@ -15548,7 +15560,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-clrSpaceQuota &lt;dirname&gt;...&lt;dirname&gt;: Clear the disk space quota for each directory &lt;dirName&gt;.( )*</expected-output>
+          <expected-output>^-clrSpaceQuota &lt;dirname&gt;...&lt;dirname&gt; -storageType &lt;storagetype&gt;: Clear the disk space quota for each directory &lt;dirName&gt;.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>