Browse Source

HDDS-1544. Support default Acls for volume, bucket, keys and prefix. Contributed by Ajay Kumar, Xiaoyu Yao.

Signed-off-by: Anu Engineer <aengineer@apache.org>
Xiaoyu Yao 5 years ago
parent
commit
d429d742f0
27 changed files with 673 additions and 269 deletions
  1. 8 7
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  2. 87 16
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
  3. 4 5
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketArgs.java
  4. 2 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
  5. 64 20
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
  6. 2 3
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmPrefixInfo.java
  7. 1 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
  8. 0 71
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
  9. 33 3
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
  10. 6 0
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  11. 79 1
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java
  12. 2 1
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmPrefixInfoCodec.java
  13. 18 18
      hadoop-ozone/dist/src/main/smoketest/basic/ozone-shell.robot
  14. 18 18
      hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot
  15. 144 32
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
  16. 8 7
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
  17. 6 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
  18. 13 8
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
  19. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java
  20. 8 7
      hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  21. 11 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
  22. 91 16
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  23. 10 12
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  24. 48 4
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
  25. 2 3
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
  26. 2 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/bucket/UpdateBucketHandler.java
  27. 5 4
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java

+ 8 - 7
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -75,7 +75,6 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.protocolPB
@@ -103,6 +102,8 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+
 /**
  * Ozone RPC Client Implementation, it connects to OM, SCM and DataNode
  * to execute client calls. This uses RPC protocol for communication
@@ -259,12 +260,12 @@ public class RpcClient implements ClientProtocol {
     List<OzoneAcl> listOfAcls = new ArrayList<>();
     //User ACL
     listOfAcls.add(new OzoneAcl(ACLIdentityType.USER,
-            owner, userRights));
+            owner, userRights, ACCESS));
     //Group ACLs of the User
     List<String> userGroups = Arrays.asList(UserGroupInformation
         .createRemoteUser(owner).getGroupNames());
     userGroups.stream().forEach((group) -> listOfAcls.add(
-        new OzoneAcl(ACLIdentityType.GROUP, group, groupRights)));
+        new OzoneAcl(ACLIdentityType.GROUP, group, groupRights, ACCESS)));
     //ACLs from VolumeArgs
     if(volArgs.getAcls() != null) {
       listOfAcls.addAll(volArgs.getAcls());
@@ -280,7 +281,7 @@ public class RpcClient implements ClientProtocol {
     //Remove duplicates and add ACLs
     for (OzoneAcl ozoneAcl :
         listOfAcls.stream().distinct().collect(Collectors.toList())) {
-      builder.addOzoneAcls(OMPBHelper.convertOzoneAcl(ozoneAcl));
+      builder.addOzoneAcls(OzoneAcl.toProtobuf(ozoneAcl));
     }
 
     if (volArgs.getQuota() == null) {
@@ -323,7 +324,7 @@ public class RpcClient implements ClientProtocol {
         volume.getQuotaInBytes(),
         volume.getCreationTime(),
         volume.getAclMap().ozoneAclGetProtobuf().stream().
-            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
+            map(OzoneAcl::fromProtobuf).collect(Collectors.toList()),
         volume.getMetadata());
   }
 
@@ -355,7 +356,7 @@ public class RpcClient implements ClientProtocol {
         volume.getQuotaInBytes(),
         volume.getCreationTime(),
         volume.getAclMap().ozoneAclGetProtobuf().stream().
-            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList())))
+            map(OzoneAcl::fromProtobuf).collect(Collectors.toList())))
         .collect(Collectors.toList());
   }
 
@@ -375,7 +376,7 @@ public class RpcClient implements ClientProtocol {
         volume.getQuotaInBytes(),
         volume.getCreationTime(),
         volume.getAclMap().ozoneAclGetProtobuf().stream().
-            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
+            map(OzoneAcl::fromProtobuf).collect(Collectors.toList()),
         volume.getMetadata()))
         .collect(Collectors.toList());
   }

+ 87 - 16
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -44,9 +45,12 @@ import java.util.stream.Collectors;
  */
 @JsonIgnoreProperties(value = {"aclBitSet"})
 public class OzoneAcl {
+
+  private static final String ACL_SCOPE_REGEX = ".*\\[(ACCESS|DEFAULT)\\]";
   private ACLIdentityType type;
   private String name;
   private BitSet aclBitSet;
+  private AclScope aclScope;
   private static final List<ACLType> EMPTY_LIST = new ArrayList<>(0);
   public static final BitSet ZERO_BITSET = new BitSet(0);
 
@@ -59,11 +63,13 @@ public class OzoneAcl {
   /**
    * Constructor for OzoneAcl.
    *
-   * @param type - Type
-   * @param name - Name of user
-   * @param acl - Rights
+   * @param type   - Type
+   * @param name   - Name of user
+   * @param acl    - Rights
+   * @param scope  - AclScope
    */
-  public OzoneAcl(ACLIdentityType type, String name, ACLType acl) {
+  public OzoneAcl(ACLIdentityType type, String name, ACLType acl,
+      AclScope scope) {
     this.name = name;
     this.aclBitSet = new BitSet(ACLType.getNoOfAcls());
     aclBitSet.set(acl.ordinal(), true);
@@ -83,16 +89,19 @@ public class OzoneAcl {
         && (name.length() == 0)) {
       throw new IllegalArgumentException("User or group name is required");
     }
+    aclScope = scope;
   }
 
   /**
    * Constructor for OzoneAcl.
    *
-   * @param type - Type
-   * @param name - Name of user
-   * @param acls - Rights
+   * @param type   - Type
+   * @param name   - Name of user
+   * @param acls   - Rights
+   * @param scope  - AclScope
    */
-  public OzoneAcl(ACLIdentityType type, String name, BitSet acls) {
+  public OzoneAcl(ACLIdentityType type, String name, BitSet acls,
+      AclScope scope) {
     Objects.requireNonNull(type);
     Objects.requireNonNull(acls);
 
@@ -120,16 +129,19 @@ public class OzoneAcl {
         && (name.length() == 0)) {
       throw new IllegalArgumentException("User or group name is required");
     }
+    aclScope = scope;
   }
 
   /**
-   * Parses an ACL string and returns the ACL object.
+   * Parses an ACL string and returns the ACL object. If acl scope is not
+   * passed in input string then scope is set to ACCESS.
    *
    * @param acl - Acl String , Ex. user:anu:rw
    *
    * @return - Ozone ACLs
    */
-  public static OzoneAcl parseAcl(String acl) throws IllegalArgumentException {
+  public static OzoneAcl parseAcl(String acl)
+      throws IllegalArgumentException {
     if ((acl == null) || acl.isEmpty()) {
       throw new IllegalArgumentException("ACLs cannot be null or empty");
     }
@@ -141,13 +153,27 @@ public class OzoneAcl {
     ACLIdentityType aclType = ACLIdentityType.valueOf(parts[0].toUpperCase());
     BitSet acls = new BitSet(ACLType.getNoOfAcls());
 
-    for (char ch : parts[2].toCharArray()) {
+    String bits = parts[2];
+
+    // Default acl scope is ACCESS.
+    AclScope aclScope = AclScope.ACCESS;
+
+    // Check if acl string contains scope info.
+    if(parts[2].matches(ACL_SCOPE_REGEX)) {
+      int indexOfOpenBracket = parts[2].indexOf("[");
+      bits = parts[2].substring(0, indexOfOpenBracket);
+      aclScope = AclScope.valueOf(parts[2].substring(indexOfOpenBracket + 1,
+          parts[2].indexOf("]")));
+    }
+
+    // Set all acl bits.
+    for (char ch : bits.toCharArray()) {
       acls.set(ACLType.getACLRight(String.valueOf(ch)).ordinal());
     }
 
     // TODO : Support sanitation of these user names by calling into
     // userAuth Interface.
-    return new OzoneAcl(aclType, parts[1], acls);
+    return new OzoneAcl(aclType, parts[1], acls, aclScope);
   }
 
   /**
@@ -178,6 +204,7 @@ public class OzoneAcl {
     OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
         .setName(acl.getName())
         .setType(OzoneAclType.valueOf(acl.getType().name()))
+        .setAclScope(OzoneAclScope.valueOf(acl.getAclScope().name()))
         .setRights(ByteString.copyFrom(acl.getAclBitSet().toByteArray()));
     return builder.build();
   }
@@ -185,12 +212,47 @@ public class OzoneAcl {
   public static OzoneAcl fromProtobuf(OzoneAclInfo protoAcl) {
     BitSet aclRights = BitSet.valueOf(protoAcl.getRights().toByteArray());
     return new OzoneAcl(ACLIdentityType.valueOf(protoAcl.getType().name()),
-        protoAcl.getName(), aclRights);
+        protoAcl.getName(), aclRights,
+        AclScope.valueOf(protoAcl.getAclScope().name()));
+  }
+
+  /**
+   * Helper function to convert a proto message of type {@link OzoneAclInfo}
+   * to {@link OzoneAcl} with acl scope of type ACCESS.
+   *
+   * @param protoAcl
+   * @return OzoneAcl
+   * */
+  public static OzoneAcl fromProtobufWithAccessType(OzoneAclInfo protoAcl) {
+    BitSet aclRights = BitSet.valueOf(protoAcl.getRights().toByteArray());
+    return new OzoneAcl(ACLIdentityType.valueOf(protoAcl.getType().name()),
+        protoAcl.getName(), aclRights, AclScope.ACCESS);
+  }
+
+  /**
+   * Helper function to convert an {@link OzoneAcl} to proto message of type
+   * {@link OzoneAclInfo} with acl scope of type ACCESS.
+   *
+   * @param acl
+   * @return OzoneAclInfo
+   * */
+  public static OzoneAclInfo toProtobufWithAccessType(OzoneAcl acl) {
+    OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
+        .setName(acl.getName())
+        .setType(OzoneAclType.valueOf(acl.getType().name()))
+        .setAclScope(OzoneAclScope.ACCESS)
+        .setRights(ByteString.copyFrom(acl.getAclBitSet().toByteArray()));
+    return builder.build();
+  }
+
+  public AclScope getAclScope() {
+    return aclScope;
   }
 
   @Override
   public String toString() {
-    return type + ":" + name + ":" + ACLType.getACLString(aclBitSet);
+    return type + ":" + name + ":" + ACLType.getACLString(aclBitSet)
+        + "[" + aclScope + "]";
   }
 
   /**
@@ -205,7 +267,7 @@ public class OzoneAcl {
   @Override
   public int hashCode() {
     return Objects.hash(this.getName(), this.getAclBitSet(),
-                        this.getType().toString());
+                        this.getType().toString(), this.getAclScope());
   }
 
   /**
@@ -262,6 +324,15 @@ public class OzoneAcl {
     OzoneAcl otherAcl = (OzoneAcl) obj;
     return otherAcl.getName().equals(this.getName()) &&
         otherAcl.getType().equals(this.getType()) &&
-        otherAcl.getAclBitSet().equals(this.getAclBitSet());
+        otherAcl.getAclBitSet().equals(this.getAclBitSet()) &&
+        otherAcl.getAclScope().equals(this.getAclScope());
+  }
+
+  /**
+   * Scope of ozone acl.
+   * */
+  public enum AclScope {
+    ACCESS,
+    DEFAULT;
   }
 }

+ 4 - 5
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketArgs.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.Auditable;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketArgs;
-import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 
 import com.google.common.base.Preconditions;
 
@@ -227,11 +226,11 @@ public final class OmBucketArgs extends WithMetadata implements Auditable {
         .setBucketName(bucketName);
     if(addAcls != null && !addAcls.isEmpty()) {
       builder.addAllAddAcls(addAcls.stream().map(
-          OMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
+          OzoneAcl::toProtobuf).collect(Collectors.toList()));
     }
     if(removeAcls != null && !removeAcls.isEmpty()) {
       builder.addAllRemoveAcls(removeAcls.stream().map(
-          OMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
+          OzoneAcl::toProtobuf).collect(Collectors.toList()));
     }
     if(isVersionEnabled != null) {
       builder.setIsVersionEnabled(isVersionEnabled);
@@ -251,9 +250,9 @@ public final class OmBucketArgs extends WithMetadata implements Auditable {
     return new OmBucketArgs(bucketArgs.getVolumeName(),
         bucketArgs.getBucketName(),
         bucketArgs.getAddAclsList().stream().map(
-            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
+            OzoneAcl::fromProtobuf).collect(Collectors.toList()),
         bucketArgs.getRemoveAclsList().stream().map(
-            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
+            OzoneAcl::fromProtobuf).collect(Collectors.toList()),
         bucketArgs.hasIsVersionEnabled() ?
             bucketArgs.getIsVersionEnabled() : null,
         bucketArgs.hasStorageType() ? StorageType.valueOf(

+ 2 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java

@@ -274,7 +274,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
     BucketInfo.Builder bib =  BucketInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .addAllAcls(acls.stream().map(OMPBHelper::convertOzoneAcl)
+        .addAllAcls(acls.stream().map(OzoneAcl::toProtobuf)
             .collect(Collectors.toList()))
         .setIsVersionEnabled(isVersionEnabled)
         .setStorageType(storageType.toProto())
@@ -296,7 +296,7 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
         .setVolumeName(bucketInfo.getVolumeName())
         .setBucketName(bucketInfo.getBucketName())
         .setAcls(bucketInfo.getAclsList().stream().map(
-            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()))
+            OzoneAcl::fromProtobuf).collect(Collectors.toList()))
         .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
         .setStorageType(StorageType.valueOf(bucketInfo.getStorageType()))
         .setCreationTime(bucketInfo.getCreationTime());

+ 64 - 20
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
@@ -31,12 +32,14 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
@@ -49,31 +52,42 @@ import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NON
 @SuppressWarnings("ProtocolBufferOrdinal")
 public class OmOzoneAclMap {
   // per Acl Type user:rights map
-  private ArrayList<Map<String, BitSet>> aclMaps;
+  private ArrayList<Map<String, BitSet>> accessAclMap;
+  private List<OzoneAclInfo> defaultAclList;
 
   OmOzoneAclMap() {
-    aclMaps = new ArrayList<>();
+    accessAclMap = new ArrayList<>();
+    defaultAclList = new ArrayList<>();
     for (OzoneAclType aclType : OzoneAclType.values()) {
-      aclMaps.add(aclType.ordinal(), new HashMap<>());
+      accessAclMap.add(aclType.ordinal(), new HashMap<>());
     }
   }
 
-  private Map<String, BitSet> getMap(OzoneAclType type) {
-    return aclMaps.get(type.ordinal());
+  private Map<String, BitSet> getAccessAclMap(OzoneAclType type) {
+    return accessAclMap.get(type.ordinal());
   }
 
   // For a given acl type and user, get the stored acl
   private BitSet getAcl(OzoneAclType type, String user) {
-    return getMap(type).get(user);
+    return getAccessAclMap(type).get(user);
   }
 
   public List<OzoneAcl> getAcl() {
     List<OzoneAcl> acls = new ArrayList<>();
 
+    acls.addAll(getAccessAcls());
+    acls.addAll(defaultAclList.stream().map(a ->
+        OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()));
+    return acls;
+  }
+
+  private Collection<? extends OzoneAcl> getAccessAcls() {
+    List<OzoneAcl> acls = new ArrayList<>();
     for (OzoneAclType type : OzoneAclType.values()) {
-      aclMaps.get(type.ordinal()).entrySet().stream().
+      accessAclMap.get(type.ordinal()).entrySet().stream().
           forEach(entry -> acls.add(new OzoneAcl(ACLIdentityType.
-              valueOf(type.name()), entry.getKey(), entry.getValue())));
+              valueOf(type.name()), entry.getKey(), entry.getValue(),
+              OzoneAcl.AclScope.ACCESS)));
     }
     return acls;
   }
@@ -81,13 +95,19 @@ public class OmOzoneAclMap {
   // Add a new acl to the map
   public void addAcl(OzoneAcl acl) throws OMException {
     Objects.requireNonNull(acl, "Acl should not be null.");
+    if (acl.getAclScope().equals(OzoneAcl.AclScope.DEFAULT)) {
+      defaultAclList.add(OzoneAcl.toProtobuf(acl));
+      return;
+    }
+
     OzoneAclType aclType = OzoneAclType.valueOf(acl.getType().name());
-    if (!getMap(aclType).containsKey(acl.getName())) {
-      getMap(aclType).put(acl.getName(), acl.getAclBitSet());
+    if (!getAccessAclMap(aclType).containsKey(acl.getName())) {
+      getAccessAclMap(aclType).put(acl.getName(), acl.getAclBitSet());
     } else {
       // Check if we are adding new rights to existing acl.
       BitSet temp = (BitSet) acl.getAclBitSet().clone();
-      BitSet curRights = (BitSet) getMap(aclType).get(acl.getName()).clone();
+      BitSet curRights = (BitSet) getAccessAclMap(aclType).
+          get(acl.getName()).clone();
       temp.or(curRights);
 
       if (temp.equals(curRights)) {
@@ -95,7 +115,7 @@ public class OmOzoneAclMap {
         throw new OMException("Acl " + acl + " already exist.",
             INVALID_REQUEST);
       }
-      getMap(aclType).replace(acl.getName(), temp);
+      getAccessAclMap(aclType).replace(acl.getName(), temp);
     }
   }
 
@@ -104,9 +124,8 @@ public class OmOzoneAclMap {
     Objects.requireNonNull(acls, "Acls should not be null.");
     // Remove all Acls.
     for (OzoneAclType type : OzoneAclType.values()) {
-      aclMaps.get(type.ordinal()).clear();
+      accessAclMap.get(type.ordinal()).clear();
     }
-
     // Add acls.
     for (OzoneAcl acl : acls) {
       addAcl(acl);
@@ -116,9 +135,14 @@ public class OmOzoneAclMap {
   // Add a new acl to the map
   public void removeAcl(OzoneAcl acl) throws OMException {
     Objects.requireNonNull(acl, "Acl should not be null.");
+    if (acl.getAclScope().equals(OzoneAcl.AclScope.DEFAULT)) {
+      defaultAclList.remove(OzoneAcl.toProtobuf(acl));
+      return;
+    }
+
     OzoneAclType aclType = OzoneAclType.valueOf(acl.getType().name());
-    if (getMap(aclType).containsKey(acl.getName())) {
-      BitSet aclRights = getMap(aclType).get(acl.getName());
+    if (getAccessAclMap(aclType).containsKey(acl.getName())) {
+      BitSet aclRights = getAccessAclMap(aclType).get(acl.getName());
       BitSet bits = (BitSet) acl.getAclBitSet().clone();
       bits.and(aclRights);
 
@@ -133,7 +157,7 @@ public class OmOzoneAclMap {
 
       // Remove the acl as all rights are already set to 0.
       if (aclRights.equals(ZERO_BITSET)) {
-        getMap(aclType).remove(acl.getName());
+        getAccessAclMap(aclType).remove(acl.getName());
       }
     } else {
       // throw exception if acl doesn't exist.
@@ -145,9 +169,14 @@ public class OmOzoneAclMap {
   // Add a new acl to the map
   public void addAcl(OzoneAclInfo acl) throws OMException {
     Objects.requireNonNull(acl, "Acl should not be null.");
-    if (!getMap(acl.getType()).containsKey(acl.getName())) {
+    if (acl.getAclScope().equals(OzoneAclInfo.OzoneAclScope.DEFAULT)) {
+      defaultAclList.add(acl);
+      return;
+    }
+
+    if (!getAccessAclMap(acl.getType()).containsKey(acl.getName())) {
       BitSet acls = BitSet.valueOf(acl.getRights().toByteArray());
-      getMap(acl.getType()).put(acl.getName(), acls);
+      getAccessAclMap(acl.getType()).put(acl.getName(), acls);
     } else {
       // throw exception if acl is already added.
 
@@ -234,15 +263,17 @@ public class OmOzoneAclMap {
     List<OzoneAclInfo> aclList = new LinkedList<>();
     for (OzoneAclType type : OzoneAclType.values()) {
       for (Map.Entry<String, BitSet> entry :
-          aclMaps.get(type.ordinal()).entrySet()) {
+          accessAclMap.get(type.ordinal()).entrySet()) {
         OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
             .setName(entry.getKey())
             .setType(type)
+            .setAclScope(OzoneAclScope.ACCESS)
             .setRights(ByteString.copyFrom(entry.getValue().toByteArray()));
 
         aclList.add(builder.build());
       }
     }
+    aclList.addAll(defaultAclList);
     return aclList;
   }
 
@@ -255,4 +286,17 @@ public class OmOzoneAclMap {
     }
     return aclMap;
   }
+
+  public Collection<? extends OzoneAcl> getAclsByScope(OzoneAclScope scope) {
+    if (scope.equals(OzoneAclScope.DEFAULT)) {
+      return defaultAclList.stream().map(a ->
+          OzoneAcl.fromProtobuf(a)).collect(Collectors.toList());
+    } else {
+      return getAcl();
+    }
+  }
+
+  public List<OzoneAclInfo> getDefaultAclList() {
+    return defaultAclList;
+  }
 }

+ 2 - 3
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmPrefixInfo.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.helpers;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrefixInfo;
-import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -125,7 +124,7 @@ public final class OmPrefixInfo extends WithMetadata {
    */
   public PrefixInfo getProtobuf() {
     PrefixInfo.Builder pib =  PrefixInfo.newBuilder().setName(name)
-        .addAllAcls(acls.stream().map(OMPBHelper::convertOzoneAcl)
+        .addAllAcls(acls.stream().map(OzoneAcl::toProtobuf)
             .collect(Collectors.toList()))
         .addAllMetadata(KeyValueUtil.toProtobuf(metadata));
     return pib.build();
@@ -140,7 +139,7 @@ public final class OmPrefixInfo extends WithMetadata {
     OmPrefixInfo.Builder opib = OmPrefixInfo.newBuilder()
         .setName(prefixInfo.getName())
         .setAcls(prefixInfo.getAclsList().stream().map(
-            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
+            OzoneAcl::fromProtobuf).collect(Collectors.toList()));
     if (prefixInfo.getMetadataList() != null) {
       opib.addAllMetadata(KeyValueUtil
           .getFromProtobuf(prefixInfo.getMetadataList()));

+ 1 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -505,5 +505,4 @@ public interface OzoneManagerProtocol
    * */
   List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
 
-}
-
+}

+ 0 - 71
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java

@@ -22,7 +22,6 @@ import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -33,18 +32,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .CryptoProtocolVersionProto;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .FileEncryptionInfoProto;
-
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 
-import java.util.BitSet;
-
 /**
  * Utilities for converting protobuf classes.
  */
@@ -54,68 +45,6 @@ public final class OMPBHelper {
     /** Hidden constructor */
   }
 
-  /**
-   * Converts OzoneAcl into protobuf's OzoneAclInfo.
-   * @return OzoneAclInfo
-   */
-  public static OzoneAclInfo convertOzoneAcl(OzoneAcl acl) {
-    OzoneAclInfo.OzoneAclType aclType;
-    switch (acl.getType()) {
-    case USER:
-      aclType = OzoneAclType.USER;
-      break;
-    case GROUP:
-      aclType = OzoneAclType.GROUP;
-      break;
-    case WORLD:
-      aclType = OzoneAclType.WORLD;
-      break;
-    case ANONYMOUS:
-      aclType = OzoneAclType.ANONYMOUS;
-      break;
-    case CLIENT_IP:
-      aclType = OzoneAclType.CLIENT_IP;
-      break;
-    default:
-      throw new IllegalArgumentException("ACL type is not recognized");
-    }
-
-    return OzoneAclInfo.newBuilder().setType(aclType)
-        .setName(acl.getName())
-        .setRights(ByteString.copyFrom(acl.getAclBitSet().toByteArray()))
-        .build();
-  }
-
-  /**
-   * Converts protobuf's OzoneAclInfo into OzoneAcl.
-   * @return OzoneAcl
-   */
-  public static OzoneAcl convertOzoneAcl(OzoneAclInfo aclInfo) {
-    ACLIdentityType aclType;
-    switch (aclInfo.getType()) {
-    case USER:
-      aclType = ACLIdentityType.USER;
-      break;
-    case GROUP:
-      aclType = ACLIdentityType.GROUP;
-      break;
-    case WORLD:
-      aclType = ACLIdentityType.WORLD;
-      break;
-    case ANONYMOUS:
-      aclType = ACLIdentityType.ANONYMOUS;
-      break;
-    case CLIENT_IP:
-      aclType = ACLIdentityType.CLIENT_IP;
-      break;
-    default:
-      throw new IllegalArgumentException("ACL type is not recognized");
-    }
-
-    BitSet aclRights = BitSet.valueOf(aclInfo.getRights().toByteArray());
-    return new OzoneAcl(aclType, aclInfo.getName(), aclRights);
-  }
-
   /**
    * Converts Ozone delegation token to @{@link TokenProto}.
    * @return tokenProto

+ 33 - 3
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java

@@ -25,11 +25,13 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +46,8 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.ratis.util.TimeDuration;
 
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
@@ -251,7 +255,7 @@ public final class OzoneUtils {
   }
 
   /**
-   * Helper function to get deafult acl list for current user.
+   * Helper function to get access acl list for current user.
    *
    * @param userName
    * @param userGroups
@@ -263,11 +267,11 @@ public final class OzoneUtils {
     List<OzoneAcl> listOfAcls = new ArrayList<>();
 
     // User ACL.
-    listOfAcls.add(new OzoneAcl(USER, userName, userRights));
+    listOfAcls.add(new OzoneAcl(USER, userName, userRights, ACCESS));
     if(userGroups != null) {
       // Group ACLs of the User.
       userGroups.forEach((group) -> listOfAcls.add(
-          new OzoneAcl(GROUP, group, groupRights)));
+          new OzoneAcl(GROUP, group, groupRights, ACCESS)));
     }
     return listOfAcls;
   }
@@ -365,4 +369,30 @@ public final class OzoneUtils {
         || bitset.get(ALL.ordinal()))
         && !bitset.get(NONE.ordinal()));
   }
+
+  /**
+   * Helper function to find and return all DEFAULT acls in input list with
+   * scope changed to ACCESS.
+   * @param acls
+   *
+   * @return list of default Acls.
+   * */
+  public static Collection<OzoneAclInfo> getDefaultAclsProto(
+      List<OzoneAcl> acls) {
+    return acls.stream().filter(a -> a.getAclScope() == DEFAULT)
+        .map(OzoneAcl::toProtobufWithAccessType).collect(Collectors.toList());
+  }
+
+  /**
+   * Helper function to find and return all DEFAULT acls in input list with
+   * scope changed to ACCESS.
+   * @param acls
+   *
+   * @return list of default Acls.
+   * */
+  public static Collection<OzoneAcl> getDefaultAcls(List<OzoneAcl> acls) {
+    return acls.stream().filter(a -> a.getAclScope() == DEFAULT)
+        .collect(Collectors.toList());
+  }
+
 }

+ 6 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -505,9 +505,15 @@ message OzoneAclInfo {
         CLIENT_IP = 5;
     }
 
+    enum OzoneAclScope {
+      ACCESS = 0;
+      DEFAULT = 1;
+    }
+
     required OzoneAclType type = 1;
     required string name = 2;
     required bytes rights = 3;
+    required OzoneAclScope aclScope = 4 [default = ACCESS];
 }
 
 message GetAclRequest {

+ 79 - 1
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java

@@ -204,8 +204,66 @@ public class TestOzoneAcls {
     assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
     assertEquals(ACLIdentityType.WORLD, acl.getType());
 
+    // Acls with scope info.
+    acl = OzoneAcl.parseAcl("user:bilbo:rwdlncxy[DEFAULT]");
+    assertEquals(acl.getName(), "bilbo");
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
+    assertTrue(acl.getAclScope().equals(OzoneAcl.AclScope.DEFAULT));
+
+    acl = OzoneAcl.parseAcl("user:bilbo:rwdlncxy[ACCESS]");
+    assertEquals(acl.getName(), "bilbo");
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
+    assertTrue(acl.getAclScope().equals(OzoneAcl.AclScope.ACCESS));
+
+    acl = OzoneAcl.parseAcl("group:hadoop:rwdlncxy[ACCESS]");
+    assertEquals(acl.getName(), "hadoop");
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
+    assertEquals(ACLIdentityType.GROUP, acl.getType());
+    assertTrue(acl.getAclScope().equals(OzoneAcl.AclScope.ACCESS));
+
+    acl = OzoneAcl.parseAcl("world::rwdlncxy[DEFAULT]");
+    assertEquals(acl.getName(), "WORLD");
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
+    assertEquals(ACLIdentityType.WORLD, acl.getType());
+    assertTrue(acl.getAclScope().equals(OzoneAcl.AclScope.DEFAULT));
+
+
+
     LambdaTestUtils.intercept(IllegalArgumentException.class, "ACL right" +
-            " is not", () -> OzoneAcl.parseAcl("world::rwdlncxncxdfsfgbny"));
+            " is not", () -> OzoneAcl.parseAcl("world::rwdlncxncxdfsfgbny"
+    ));
   }
 
   @Test
@@ -246,6 +304,26 @@ public class TestOzoneAcls {
     assertFalse(rights.contains(READ));
     rights = acls.get(1).getAclList();
     assertTrue(rights.contains(ALL));
+
+    acls = OzoneAcl.parseAcls("user:bilbo:cxy[ACCESS]," +
+        "group:hadoop:a[DEFAULT],world::r[DEFAULT]");
+    assertTrue(acls.size() == 3);
+    rights = acls.get(0).getAclList();
+    assertTrue(rights.size() == 3);
+    assertTrue(rights.contains(CREATE));
+    assertTrue(rights.contains(READ_ACL));
+    assertTrue(rights.contains(WRITE_ACL));
+    assertFalse(rights.contains(WRITE));
+    assertFalse(rights.contains(READ));
+    rights = acls.get(1).getAclList();
+    assertTrue(rights.contains(ALL));
+
+    assertTrue(acls.get(0).getName().equals("bilbo"));
+    assertTrue(acls.get(1).getName().equals("hadoop"));
+    assertTrue(acls.get(2).getName().equals("WORLD"));
+    assertTrue(acls.get(0).getAclScope().equals(OzoneAcl.AclScope.ACCESS));
+    assertTrue(acls.get(1).getAclScope().equals(OzoneAcl.AclScope.DEFAULT));
+    assertTrue(acls.get(2).getAclScope().equals(OzoneAcl.AclScope.DEFAULT));
   }
 
 }

+ 2 - 1
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/codec/TestOmPrefixInfoCodec.java

@@ -33,6 +33,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -81,7 +82,7 @@ public class TestOmPrefixInfoCodec {
 
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl ozoneAcl = new OzoneAcl(ACLIdentityType.USER,
-        "hive", ACLType.ALL);
+        "hive", ACLType.ALL, ACCESS);
     acls.add(ozoneAcl);
     OmPrefixInfo opiSave = OmPrefixInfo.newBuilder()
         .setName("/user/hive/warehouse")

+ 18 - 18
hadoop-ozone/dist/src/main/smoketest/basic/ozone-shell.robot

@@ -74,33 +74,33 @@ Test Volume Acls
     [arguments]     ${protocol}         ${server}       ${volume}
     Execute         ozone sh volume create ${protocol}${server}/${volume}
     ${result} =     Execute             ozone sh volume getacl ${protocol}${server}/${volume}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclList\" : . \"ALL\" .
-    ${result} =     Execute             ozone sh volume addacl ${protocol}${server}/${volume} -a user:superuser1:rwxy
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .
+    ${result} =     Execute             ozone sh volume addacl ${protocol}${server}/${volume} -a user:superuser1:rwxy[DEFAULT]
     ${result} =     Execute             ozone sh volume getacl ${protocol}${server}/${volume}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\" .
     ${result} =     Execute             ozone sh volume removeacl ${protocol}${server}/${volume} -a user:superuser1:xy
     ${result} =     Execute             ozone sh volume getacl ${protocol}${server}/${volume}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh volume setacl ${protocol}${server}/${volume} -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\" .
+    ${result} =     Execute             ozone sh volume setacl ${protocol}${server}/${volume} -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc,group:superuser1:a[DEFAULT]
     ${result} =     Execute             ozone sh volume getacl ${protocol}${server}/${volume}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\" .
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"ALL\" .
 
 Test Bucket Acls
     [arguments]     ${protocol}         ${server}       ${volume}
     Execute             ozone sh bucket create ${protocol}${server}/${volume}/bb1
     ${result} =     Execute             ozone sh bucket getacl ${protocol}${server}/${volume}/bb1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclList\" : . \"ALL\" .
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .
     ${result} =     Execute             ozone sh bucket addacl ${protocol}${server}/${volume}/bb1 -a user:superuser1:rwxy
     ${result} =     Execute             ozone sh bucket getacl ${protocol}${server}/${volume}/bb1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     ${result} =     Execute             ozone sh bucket removeacl ${protocol}${server}/${volume}/bb1 -a user:superuser1:xy
     ${result} =     Execute             ozone sh bucket getacl ${protocol}${server}/${volume}/bb1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh bucket setacl ${protocol}${server}/${volume}/bb1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
+    ${result} =     Execute             ozone sh bucket setacl ${protocol}${server}/${volume}/bb1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc,group:superuser1:a[DEFAULT]
     ${result} =     Execute             ozone sh bucket getacl ${protocol}${server}/${volume}/bb1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"ALL\" .
 
 
 Test key handling
@@ -122,14 +122,14 @@ Test key Acls
     [arguments]     ${protocol}         ${server}       ${volume}
     Execute         ozone sh key put ${protocol}${server}/${volume}/bb1/key2 /opt/hadoop/NOTICE.txt
     ${result} =     Execute             ozone sh key getacl ${protocol}${server}/${volume}/bb1/key2
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclList\" : . \"ALL\" .
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .
     ${result} =     Execute             ozone sh key addacl ${protocol}${server}/${volume}/bb1/key2 -a user:superuser1:rwxy
     ${result} =     Execute             ozone sh key getacl ${protocol}${server}/${volume}/bb1/key2
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     ${result} =     Execute             ozone sh key removeacl ${protocol}${server}/${volume}/bb1/key2 -a user:superuser1:xy
     ${result} =     Execute             ozone sh key getacl ${protocol}${server}/${volume}/bb1/key2
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
     ${result} =     Execute             ozone sh key setacl ${protocol}${server}/${volume}/bb1/key2 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh key getacl ${protocol}${server}/${volume}/bb1/key2
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .

+ 18 - 18
hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot

@@ -53,48 +53,48 @@ Test Volume Acls
     ${result} =     Execute             ozone sh volume create ${volume3}
                     Should not contain  ${result}       Failed
     ${result} =     Execute             ozone sh volume getacl ${volume3}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclList\" : . \"ALL\" .
-    ${result} =     Execute             ozone sh volume addacl ${volume3} -a user:superuser1:rwxy
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .
+    ${result} =     Execute             ozone sh volume addacl ${volume3} -a user:superuser1:rwxy[DEFAULT]
     ${result} =     Execute             ozone sh volume getacl ${volume3}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     ${result} =     Execute             ozone sh volume removeacl ${volume3} -a user:superuser1:xy
     ${result} =     Execute             ozone sh volume getacl ${volume3}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh volume setacl ${volume3} -al user:superuser1:rwxy,user:testuser/scm@EXAMPLE.COM:rwxyc,group:superuser1:a
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
+    ${result} =     Execute             ozone sh volume setacl ${volume3} -al user:superuser1:rwxy,user:testuser/scm@EXAMPLE.COM:rwxyc,group:superuser1:a[DEFAULT]
     ${result} =     Execute             ozone sh volume getacl ${volume3}
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"DEFAULT\",\n.*\"aclList\" : . \"ALL\"
 
 Test Bucket Acls
     ${result} =     Execute             ozone sh bucket create ${volume3}/bk1
                     Should not contain  ${result}       Failed
     ${result} =     Execute             ozone sh bucket getacl ${volume3}/bk1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclList\" : . \"ALL\" .
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .
     ${result} =     Execute             ozone sh bucket addacl ${volume3}/bk1 -a user:superuser1:rwxy
     ${result} =     Execute             ozone sh bucket getacl ${volume3}/bk1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     ${result} =     Execute             ozone sh bucket removeacl ${volume3}/bk1 -a user:superuser1:xy
     ${result} =     Execute             ozone sh bucket getacl ${volume3}/bk1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh bucket setacl ${volume3}/bk1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
+    ${result} =     Execute             ozone sh bucket setacl ${volume3}/bk1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc,group:superuser1:a[DEFAULT]
     ${result} =     Execute             ozone sh bucket getacl ${volume3}/bk1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\"
 
 Test key Acls
     Execute            ozone sh key put ${volume3}/bk1/key1 /opt/hadoop/NOTICE.txt
     ${result} =     Execute             ozone sh key getacl ${volume3}/bk1/key1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclList\" : . \"ALL\" .
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \".*\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\" .
     ${result} =     Execute             ozone sh key addacl ${volume3}/bk1/key1 -a user:superuser1:rwxy
     ${result} =     Execute             ozone sh key getacl ${volume3}/bk1/key1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     ${result} =     Execute             ozone sh key removeacl ${volume3}/bk1/key1 -a user:superuser1:xy
     ${result} =     Execute             ozone sh key getacl ${volume3}/bk1/key1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
     ${result} =     Execute             ozone sh key setacl ${volume3}/bk1/key1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh key getacl ${volume3}/bk1/key1
-    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclScope\" : \"ACCESS\",\n.*\"aclList\" : . \"ALL\"
 
 Test native authorizer
     Execute         ozone sh volume removeacl ${volume3} -a group:root:a

+ 144 - 32
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -85,7 +86,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.s3.util.OzoneS3Util;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -103,6 +103,11 @@ import org.apache.commons.lang3.RandomUtils;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.either;
 
@@ -137,6 +142,15 @@ public abstract class TestOzoneRpcClientAbstract {
   private static OzoneManager ozoneManager;
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
+  private static String remoteUserName = "remoteUser";
+  private static OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
+      READ, DEFAULT);
+  private static OzoneAcl defaultGroupAcl = new OzoneAcl(GROUP, remoteUserName,
+      READ, DEFAULT);
+  private static OzoneAcl inheritedUserAcl = new OzoneAcl(USER, remoteUserName,
+      READ, ACCESS);
+  private static OzoneAcl inheritedGroupAcl = new OzoneAcl(GROUP,
+      remoteUserName, READ, ACCESS);
 
   private static String scmId = UUID.randomUUID().toString();
 
@@ -439,8 +453,8 @@ public abstract class TestOzoneRpcClientAbstract {
       throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    OzoneAcl userAcl = new OzoneAcl(ACLIdentityType.USER, "test",
-        ACLType.READ);
+    OzoneAcl userAcl = new OzoneAcl(USER, "test",
+        READ, ACCESS);
     List<OzoneAcl> acls = new ArrayList<>();
     acls.add(userAcl);
     store.createVolume(volumeName);
@@ -458,8 +472,8 @@ public abstract class TestOzoneRpcClientAbstract {
       throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    OzoneAcl userAcl = new OzoneAcl(ACLIdentityType.USER, "test",
-        ACLType.ALL);
+    OzoneAcl userAcl = new OzoneAcl(USER, "test",
+        ACLType.ALL, ACCESS);
     List<OzoneAcl> acls = new ArrayList<>();
     acls.add(userAcl);
     store.createVolume(volumeName);
@@ -499,7 +513,7 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
     List<OzoneAcl> acls = new ArrayList<>();
-    acls.add(new OzoneAcl(ACLIdentityType.USER, "test", ACLType.ALL));
+    acls.add(new OzoneAcl(USER, "test", ACLType.ALL, ACCESS));
     OzoneBucket bucket = volume.getBucket(bucketName);
     bucket.addAcls(acls);
     OzoneBucket newBucket = volume.getBucket(bucketName);
@@ -512,8 +526,8 @@ public abstract class TestOzoneRpcClientAbstract {
       throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    OzoneAcl userAcl = new OzoneAcl(ACLIdentityType.USER, "test",
-        ACLType.ALL);
+    OzoneAcl userAcl = new OzoneAcl(USER, "test",
+        ACLType.ALL, ACCESS);
     List<OzoneAcl> acls = new ArrayList<>();
     acls.add(userAcl);
     store.createVolume(volumeName);
@@ -2231,7 +2245,7 @@ public abstract class TestOzoneRpcClientAbstract {
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
 
-    validateOzoneAcl(ozObj);
+    validateOzoneAccessAcl(ozObj);
   }
 
   @Test
@@ -2252,15 +2266,54 @@ public abstract class TestOzoneRpcClientAbstract {
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
 
-    validateOzoneAcl(ozObj);
+    validateOzoneAccessAcl(ozObj);
+
+    OzoneObj volObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setResType(OzoneObj.ResourceType.VOLUME)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+    validateDefaultAcls(volObj, ozObj, volume, null);
+  }
+
+  private void validateDefaultAcls(OzoneObj parentObj, OzoneObj childObj,
+      OzoneVolume volume,  OzoneBucket bucket) throws Exception {
+    assertTrue(store.addAcl(parentObj, defaultUserAcl));
+    assertTrue(store.addAcl(parentObj, defaultGroupAcl));
+    if (volume != null) {
+      volume.deleteBucket(childObj.getBucketName());
+      volume.createBucket(childObj.getBucketName());
+    } else {
+      if (childObj.getResourceType().equals(OzoneObj.ResourceType.KEY)) {
+        bucket.deleteKey(childObj.getKeyName());
+        writeKey(childObj.getKeyName(), bucket);
+      } else {
+        store.setAcl(childObj, getAclList(new OzoneConfiguration()));
+      }
+    }
+    List<OzoneAcl> acls = store.getAcl(parentObj);
+    assertTrue("Current acls:" + StringUtils.join(",", acls) +
+            " inheritedUserAcl:" + inheritedUserAcl,
+        acls.contains(defaultUserAcl));
+    assertTrue("Current acls:" + StringUtils.join(",", acls) +
+            " inheritedUserAcl:" + inheritedUserAcl,
+        acls.contains(defaultGroupAcl));
+
+    acls = store.getAcl(childObj);
+    assertTrue("Current acls:" + StringUtils.join(",", acls) +
+            " inheritedUserAcl:" + inheritedUserAcl,
+        acls.contains(inheritedUserAcl));
+    assertTrue("Current acls:" + StringUtils.join(",", acls) +
+            " inheritedUserAcl:" + inheritedUserAcl,
+        acls.contains(inheritedGroupAcl));
   }
 
   @Test
   public void testNativeAclsForKey() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    String key1 = UUID.randomUUID().toString();
-    String key2 = UUID.randomUUID().toString();
+    String key1 = "dir1/dir2" + UUID.randomUUID().toString();
+    String key2 = "dir1/dir2" + UUID.randomUUID().toString();
 
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
@@ -2279,7 +2332,42 @@ public abstract class TestOzoneRpcClientAbstract {
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
 
-    validateOzoneAcl(ozObj);
+    // Validates access acls.
+    validateOzoneAccessAcl(ozObj);
+
+    // Check default acls inherited from bucket.
+    OzoneObj buckObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(key1)
+        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    validateDefaultAcls(buckObj, ozObj, null, bucket);
+
+    // Check default acls inherited from prefix.
+    OzoneObj prefixObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(key1)
+        .setPrefixName("dir1/")
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+    store.setAcl(prefixObj, getAclList(new OzoneConfiguration()));
+    // Prefix should inherit DEFAULT acl from bucket.
+
+    List<OzoneAcl> acls = store.getAcl(prefixObj);
+    assertTrue("Current acls:" + StringUtils.join(",", acls),
+        acls.contains(inheritedUserAcl));
+    assertTrue("Current acls:" + StringUtils.join(",", acls),
+        acls.contains(inheritedGroupAcl));
+    // Remove inherited acls from prefix.
+    assertTrue(store.removeAcl(prefixObj, inheritedUserAcl));
+    assertTrue(store.removeAcl(prefixObj, inheritedGroupAcl));
+
+    validateDefaultAcls(prefixObj, ozObj, null, bucket);
   }
 
   @Test
@@ -2302,7 +2390,7 @@ public abstract class TestOzoneRpcClientAbstract {
     writeKey(key1, bucket);
     writeKey(key2, bucket);
 
-    OzoneObj ozObj = new OzoneObjInfo.Builder()
+    OzoneObj prefixObj = new OzoneObjInfo.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setPrefixName(prefix1)
@@ -2310,36 +2398,58 @@ public abstract class TestOzoneRpcClientAbstract {
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
 
+    OzoneObj prefixObj2 = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setPrefixName(prefix2)
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
     // add acl
     BitSet aclRights1 = new BitSet();
-    aclRights1.set(ACLType.READ.ordinal());
-    OzoneAcl user1Acl = new OzoneAcl(ACLIdentityType.USER,
-        "user1", aclRights1);
-    assertTrue(store.addAcl(ozObj, user1Acl));
+    aclRights1.set(READ.ordinal());
+    OzoneAcl user1Acl = new OzoneAcl(USER,
+        "user1", aclRights1, ACCESS);
+    assertTrue(store.addAcl(prefixObj, user1Acl));
 
     // get acl
-    List<OzoneAcl> aclsGet = store.getAcl(ozObj);
+    List<OzoneAcl> aclsGet = store.getAcl(prefixObj);
     Assert.assertEquals(1, aclsGet.size());
     Assert.assertEquals(user1Acl, aclsGet.get(0));
 
     // remove acl
-    Assert.assertTrue(store.removeAcl(ozObj, user1Acl));
-    aclsGet = store.getAcl(ozObj);
+    Assert.assertTrue(store.removeAcl(prefixObj, user1Acl));
+    aclsGet = store.getAcl(prefixObj);
     Assert.assertEquals(0, aclsGet.size());
 
     // set acl
     BitSet aclRights2 = new BitSet();
     aclRights2.set(ACLType.ALL.ordinal());
-    OzoneAcl group1Acl = new OzoneAcl(ACLIdentityType.GROUP,
-        "group1", aclRights2);
+    OzoneAcl group1Acl = new OzoneAcl(GROUP,
+        "group1", aclRights2, ACCESS);
     List<OzoneAcl> acls = new ArrayList<>();
     acls.add(user1Acl);
     acls.add(group1Acl);
-    Assert.assertTrue(store.setAcl(ozObj, acls));
+    Assert.assertTrue(store.setAcl(prefixObj, acls));
 
     // get acl
-    aclsGet = store.getAcl(ozObj);
+    aclsGet = store.getAcl(prefixObj);
     Assert.assertEquals(2, aclsGet.size());
+
+    OzoneObj keyObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(key1)
+        .setResType(OzoneObj.ResourceType.KEY)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+
+    // Check default acls inherited from prefix.
+    validateDefaultAcls(prefixObj, keyObj, null, bucket);
+
+    // Check default acls inherited from bucket when prefix does not exist.
+    validateDefaultAcls(prefixObj2, keyObj, null, bucket);
   }
 
   /**
@@ -2357,12 +2467,12 @@ public abstract class TestOzoneRpcClientAbstract {
     ACLType userRights = aclConfig.getUserDefaultRights();
     ACLType groupRights = aclConfig.getGroupDefaultRights();
 
-    listOfAcls.add(new OzoneAcl(ACLIdentityType.USER,
-        ugi.getUserName(), userRights));
+    listOfAcls.add(new OzoneAcl(USER,
+        ugi.getUserName(), userRights, ACCESS));
     //Group ACLs of the User
     List<String> userGroups = Arrays.asList(ugi.getGroupNames());
     userGroups.stream().forEach((group) -> listOfAcls.add(
-        new OzoneAcl(ACLIdentityType.GROUP, group, groupRights)));
+        new OzoneAcl(GROUP, group, groupRights, ACCESS)));
     return listOfAcls;
   }
 
@@ -2370,7 +2480,7 @@ public abstract class TestOzoneRpcClientAbstract {
    * Helper function to validate ozone Acl for given object.
    * @param ozObj
    * */
-  private void validateOzoneAcl(OzoneObj ozObj) throws IOException {
+  private void validateOzoneAccessAcl(OzoneObj ozObj) throws IOException {
     // Get acls for volume.
     List<OzoneAcl> expectedAcls = getAclList(new OzoneConfiguration());
 
@@ -2378,7 +2488,7 @@ public abstract class TestOzoneRpcClientAbstract {
     if(expectedAcls.size()>0) {
       OzoneAcl oldAcl = expectedAcls.get(0);
       OzoneAcl newAcl = new OzoneAcl(oldAcl.getType(), oldAcl.getName(),
-          ACLType.READ_ACL);
+          ACLType.READ_ACL, ACCESS);
       // Verify that operation successful.
       assertTrue(store.addAcl(ozObj, newAcl));
       List<OzoneAcl> acls = store.getAcl(ozObj);
@@ -2433,8 +2543,10 @@ public abstract class TestOzoneRpcClientAbstract {
     expectedAcls.forEach(a -> assertTrue(finalNewAcls.contains(a)));
 
     // Reset acl's.
-    OzoneAcl ua = new OzoneAcl(ACLIdentityType.USER, "userx", ACLType.READ_ACL);
-    OzoneAcl ug = new OzoneAcl(ACLIdentityType.GROUP, "userx", ACLType.ALL);
+    OzoneAcl ua = new OzoneAcl(USER, "userx",
+        ACLType.READ_ACL, ACCESS);
+    OzoneAcl ug = new OzoneAcl(GROUP, "userx",
+        ACLType.ALL, ACCESS);
     store.setAcl(ozObj, Arrays.asList(ua, ug));
     newAcls = store.getAcl(ozObj);
     assertTrue(newAcls.size() == 2);

+ 8 - 7
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

@@ -89,6 +89,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
@@ -407,7 +408,7 @@ public class TestKeyManagerImpl {
         .build();
 
     OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
-        ACLType.READ);
+        ACLType.READ, ACCESS);
     prefixManager.addAcl(ozPrefix1, ozAcl1);
 
     List<OzoneAcl> ozAclGet = prefixManager.getAcl(ozPrefix1);
@@ -416,23 +417,23 @@ public class TestKeyManagerImpl {
 
     List<OzoneAcl> acls = new ArrayList<>();
     OzoneAcl ozAcl2 = new OzoneAcl(ACLIdentityType.USER, "admin",
-        ACLType.ALL);
+        ACLType.ALL, ACCESS);
 
     BitSet rwRights = new BitSet();
     rwRights.set(IAccessAuthorizer.ACLType.WRITE.ordinal());
     rwRights.set(IAccessAuthorizer.ACLType.READ.ordinal());
     OzoneAcl ozAcl3 = new OzoneAcl(ACLIdentityType.GROUP, "dev",
-        rwRights);
+        rwRights, ACCESS);
 
     BitSet wRights = new BitSet();
     wRights.set(IAccessAuthorizer.ACLType.WRITE.ordinal());
     OzoneAcl ozAcl4 = new OzoneAcl(ACLIdentityType.GROUP, "dev",
-        wRights);
+        wRights, ACCESS);
 
     BitSet rRights = new BitSet();
     rRights.set(IAccessAuthorizer.ACLType.READ.ordinal());
     OzoneAcl ozAcl5 = new OzoneAcl(ACLIdentityType.GROUP, "dev",
-        rRights);
+        rRights, ACCESS);
 
     acls.add(ozAcl2);
     acls.add(ozAcl3);
@@ -500,7 +501,7 @@ public class TestKeyManagerImpl {
     // Invalid prefix not ending with "/"
     String invalidPrefix = "invalid/pf";
     OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
-        ACLType.READ);
+        ACLType.READ, ACCESS);
 
     OzoneObj ozInvalidPrefix = new OzoneObjInfo.Builder()
         .setVolumeName(volumeName)
@@ -564,7 +565,7 @@ public class TestKeyManagerImpl {
         .build();
 
     OzoneAcl ozAcl1 = new OzoneAcl(ACLIdentityType.USER, "user1",
-        ACLType.READ);
+        ACLType.READ, ACCESS);
     prefixManager.addAcl(ozPrefix1, ozAcl1);
 
     OzoneObj ozFile1 = new OzoneObjInfo.Builder()

+ 6 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java

@@ -88,6 +88,7 @@ import org.apache.hadoop.utils.db.TableIterator;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
@@ -372,28 +373,28 @@ public class TestOzoneManager {
     storageHandler.createVolume(createVolumeArgs);
 
     OzoneAcl userAcl = new OzoneAcl(ACLIdentityType.USER, userName,
-        ACLType.READ);
+        ACLType.READ, ACCESS);
     Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, userAcl));
     OzoneAcl group = new OzoneAcl(ACLIdentityType.GROUP, groupName[0],
-        ACLType.READ);
+        ACLType.READ, ACCESS);
     Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, group));
 
     // Create a different user and access should fail
     String falseUserName = "user" + RandomStringUtils.randomNumeric(5);
     OzoneAcl falseUserAcl =
         new OzoneAcl(ACLIdentityType.USER, falseUserName,
-            ACLType.ALL);
+            ACLType.ALL, ACCESS);
     Assert.assertFalse(storageHandler
         .checkVolumeAccess(volumeName, falseUserAcl));
     // Checking access with user name and Group Type should fail
     OzoneAcl falseGroupAcl = new OzoneAcl(ACLIdentityType.GROUP, userName,
-        ACLType.ALL);
+        ACLType.ALL, ACCESS);
     Assert.assertFalse(storageHandler
         .checkVolumeAccess(volumeName, falseGroupAcl));
 
     // Access for acl type world should also fail
     OzoneAcl worldAcl =
-        new OzoneAcl(ACLIdentityType.WORLD, "", ACLType.READ);
+        new OzoneAcl(ACLIdentityType.WORLD, "", ACLType.READ, ACCESS);
     Assert.assertFalse(storageHandler.checkVolumeAccess(volumeName, worldAcl));
 
     Assert.assertEquals(0, omMetrics.getNumVolumeCheckAccessFails());

+ 13 - 8
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java

@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
@@ -242,9 +243,10 @@ public class TestOzoneNativeAuthorizer {
   @Test
   public void testCheckAccessForBucket() throws Exception {
 
-    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl);
+    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl,
+        ACCESS);
     OzoneAcl groupAcl = new OzoneAcl(GROUP, ugi.getGroups().size() > 0 ?
-        ugi.getGroups().get(0) : "", parentDirGroupAcl);
+        ugi.getGroups().get(0) : "", parentDirGroupAcl, ACCESS);
     // Set access for volume.
     volumeManager.setAcl(volObj, Arrays.asList(userAcl, groupAcl));
 
@@ -256,9 +258,10 @@ public class TestOzoneNativeAuthorizer {
 
   @Test
   public void testCheckAccessForKey() throws Exception {
-    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl);
+    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl,
+        ACCESS);
     OzoneAcl groupAcl = new OzoneAcl(GROUP, ugi.getGroups().size() > 0 ?
-        ugi.getGroups().get(0) : "", parentDirGroupAcl);
+        ugi.getGroups().get(0) : "", parentDirGroupAcl, ACCESS);
     // Set access for volume, bucket & prefix.
     volumeManager.setAcl(volObj, Arrays.asList(userAcl, groupAcl));
     bucketManager.setAcl(buckObj, Arrays.asList(userAcl, groupAcl));
@@ -280,9 +283,10 @@ public class TestOzoneNativeAuthorizer {
         .setStoreType(OZONE)
         .build();
 
-    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl);
+    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl,
+        ACCESS);
     OzoneAcl groupAcl = new OzoneAcl(GROUP, ugi.getGroups().size() > 0 ?
-        ugi.getGroups().get(0) : "", parentDirGroupAcl);
+        ugi.getGroups().get(0) : "", parentDirGroupAcl, ACCESS);
     // Set access for volume & bucket.
     volumeManager.setAcl(volObj, Arrays.asList(userAcl, groupAcl));
     bucketManager.setAcl(buckObj, Arrays.asList(userAcl, groupAcl));
@@ -321,7 +325,8 @@ public class TestOzoneNativeAuthorizer {
      *    if user/group has access to them.
      * */
     for (ACLType a1 : allAcls) {
-      OzoneAcl newAcl = new OzoneAcl(accessType, getAclName(accessType), a1);
+      OzoneAcl newAcl = new OzoneAcl(accessType, getAclName(accessType), a1,
+          ACCESS);
 
       // Reset acls to only one right.
       aclImplementor.setAcl(obj, Arrays.asList(newAcl));
@@ -375,7 +380,7 @@ public class TestOzoneNativeAuthorizer {
           ACLIdentityType identityType = ACLIdentityType.values()[type];
           // Add remaining acls one by one and then check access.
           OzoneAcl addAcl = new OzoneAcl(identityType, 
-              getAclName(identityType), a2);
+              getAclName(identityType), a2, ACCESS);
           aclImplementor.addAcl(obj, addAcl);
 
           // Fetch acls again.

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java

@@ -295,7 +295,7 @@ public class TestBuckets {
     OzoneVolume vol = protocol.getVolumeDetails(volumeName);
     String[] acls = {"user:frodo:rw", "user:samwise:rw"};
     List<OzoneAcl> aclList =
-        Arrays.stream(acls).map(OzoneAcl::parseAcl)
+        Arrays.stream(acls).map(acl -> OzoneAcl.parseAcl(acl))
             .collect(Collectors.toList());
 
     long currentTime = Time.now();

+ 8 - 7
hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -71,6 +70,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+
 /**
  * A {@link StorageHandler} implementation that distributes object storage
  * across the nodes of an HDFS cluster.
@@ -179,19 +180,19 @@ public final class DistributedStorageHandler implements StorageHandler {
   public void createVolume(VolumeArgs args) throws IOException, OzoneException {
     long quota = args.getQuota() == null ?
         OzoneConsts.MAX_QUOTA_IN_BYTES : args.getQuota().sizeInBytes();
-    OzoneAcl userAcl =
-        new OzoneAcl(ACLIdentityType.USER, args.getUserName(), userRights);
+    OzoneAcl userAcl = new OzoneAcl(ACLIdentityType.USER, args.getUserName(),
+            userRights, ACCESS);
     OmVolumeArgs.Builder builder = OmVolumeArgs.newBuilder();
     builder.setAdminName(args.getAdminName())
         .setOwnerName(args.getUserName())
         .setVolume(args.getVolumeName())
         .setQuotaInBytes(quota)
-        .addOzoneAcls(OMPBHelper.convertOzoneAcl(userAcl));
+        .addOzoneAcls(OzoneAcl.toProtobuf(userAcl));
     if (args.getGroups() != null) {
       for (String group : args.getGroups()) {
         OzoneAcl groupAcl =
-            new OzoneAcl(ACLIdentityType.GROUP, group, groupRights);
-        builder.addOzoneAcls(OMPBHelper.convertOzoneAcl(groupAcl));
+            new OzoneAcl(ACLIdentityType.GROUP, group, groupRights, ACCESS);
+        builder.addOzoneAcls(OzoneAcl.toProtobuf(groupAcl));
       }
     }
     ozoneManagerClient.createVolume(builder.build());
@@ -215,7 +216,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   public boolean checkVolumeAccess(String volume, OzoneAcl acl)
       throws IOException, OzoneException {
     return ozoneManagerClient
-        .checkVolumeAccess(volume, OMPBHelper.convertOzoneAcl(acl));
+        .checkVolumeAccess(volume, OzoneAcl.toProtobuf(acl));
   }
 
   @Override

+ 11 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@@ -49,6 +50,8 @@ import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCK
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope.*;
+
 /**
  * OM bucket manager.
  */
@@ -125,9 +128,10 @@ public class BucketManagerImpl implements BucketManager {
           volumeName, bucketName);
       String volumeKey = metadataManager.getVolumeKey(volumeName);
       String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      OmVolumeArgs volumeArgs = metadataManager.getVolumeTable().get(volumeKey);
 
       //Check if the volume exists
-      if (metadataManager.getVolumeTable().get(volumeKey) == null) {
+      if (volumeArgs == null) {
         LOG.debug("volume: {} not found ", volumeName);
         throw new OMException("Volume doesn't exist",
             OMException.ResultCodes.VOLUME_NOT_FOUND);
@@ -165,10 +169,15 @@ public class BucketManagerImpl implements BucketManager {
             .setVersion(CryptoProtocolVersion.ENCRYPTION_ZONES)
             .setSuite(CipherSuite.convert(metadata.getCipher()));
       }
+      List<OzoneAcl> acls = new ArrayList<>();
+      acls.addAll(bucketInfo.getAcls());
+      volumeArgs.getAclMap().getDefaultAclList().forEach(
+          a -> acls.add(OzoneAcl.fromProtobufWithAccessType(a)));
+
       OmBucketInfo.Builder omBucketInfoBuilder = OmBucketInfo.newBuilder()
           .setVolumeName(bucketInfo.getVolumeName())
           .setBucketName(bucketInfo.getBucketName())
-          .setAcls(bucketInfo.getAcls())
+          .setAcls(acls)
           .setStorageType(bucketInfo.getStorageType())
           .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
           .setCreationTime(Time.now())

+ 91 - 16
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -57,6 +57,8 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -149,21 +151,24 @@ public class KeyManagerImpl implements KeyManager {
   private BackgroundService keyDeletingService;
 
   private final KeyProviderCryptoExtension kmsProvider;
+  private final PrefixManager prefixManager;
 
 
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager) {
     this(new ScmClient(scmBlockClient, null), metadataManager,
-        conf, omId, secretManager, null);
+        conf, omId, secretManager, null, null);
   }
 
   public KeyManagerImpl(ScmClient scmClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager,
-      KeyProviderCryptoExtension kmsProvider) {
+      KeyProviderCryptoExtension kmsProvider,
+      PrefixManager prefixManager) {
     this.scmClient = scmClient;
     this.metadataManager = metadataManager;
+    this.prefixManager = prefixManager;
     this.scmBlockSize = (long) conf
         .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
             StorageUnit.BYTES);
@@ -244,16 +249,19 @@ public class KeyManagerImpl implements KeyManager {
    * @param bucketName
    * @throws IOException
    */
-  private void validateS3Bucket(String volumeName, String bucketName)
+  private OmBucketInfo validateS3Bucket(String volumeName, String bucketName)
       throws IOException {
 
     String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+    OmBucketInfo omBucketInfo = metadataManager.getBucketTable().
+        get(bucketKey);
     //Check if bucket already exists
-    if (metadataManager.getBucketTable().get(bucketKey) == null) {
+    if (omBucketInfo == null) {
       LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
       throw new OMException("Bucket not found",
           BUCKET_NOT_FOUND);
     }
+    return omBucketInfo;
   }
 
   @Override
@@ -412,8 +420,9 @@ public class KeyManagerImpl implements KeyManager {
 
     FileEncryptionInfo encInfo;
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
+    OmBucketInfo bucketInfo;
     try {
-      OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
+      bucketInfo = getBucketInfo(volumeName, bucketName);
       encInfo = getFileEncryptionInfo(bucketInfo);
       keyInfo = prepareKeyInfo(args, dbKeyName, size, locations, encInfo);
     } catch (OMException e) {
@@ -429,7 +438,8 @@ public class KeyManagerImpl implements KeyManager {
     if (keyInfo == null) {
       // the key does not exist, create a new object, the new blocks are the
       // version 0
-      keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
+      keyInfo = createKeyInfo(args, locations, factor, type, size,
+          encInfo, bucketInfo);
     }
     openVersion = keyInfo.getLatestVersionLocations().getVersion();
     LOG.debug("Key {} allocated in volume {} bucket {}",
@@ -506,7 +516,8 @@ public class KeyManagerImpl implements KeyManager {
     }
     // For this upload part we don't need to check in KeyTable. As this
     // is not an actual key, it is a part of the key.
-    return createKeyInfo(args, locations, factor, type, size, encInfo);
+    return createKeyInfo(args, locations, factor, type, size, encInfo,
+        getBucketInfo(args.getVolumeName(), args.getBucketName()));
   }
 
   /**
@@ -520,10 +531,11 @@ public class KeyManagerImpl implements KeyManager {
    * @return
    */
   private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs,
-                                  List<OmKeyLocationInfo> locations,
-                                  ReplicationFactor factor,
-                                  ReplicationType type, long size,
-                                  FileEncryptionInfo encInfo) {
+      List<OmKeyLocationInfo> locations,
+      ReplicationFactor factor,
+      ReplicationType type, long size,
+      FileEncryptionInfo encInfo,
+      OmBucketInfo omBucketInfo) {
     OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
@@ -536,10 +548,34 @@ public class KeyManagerImpl implements KeyManager {
         .setReplicationType(type)
         .setReplicationFactor(factor)
         .setFileEncryptionInfo(encInfo);
+    List<OzoneAclInfo> acls = new ArrayList<>();
     if(keyArgs.getAcls() != null) {
-      builder.setAcls(keyArgs.getAcls().stream().map(a ->
+      acls.addAll(keyArgs.getAcls().stream().map(a ->
           OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
     }
+
+    // Inherit DEFAULT acls from prefix.
+    boolean prefixParentFound = false;
+    if(prefixManager != null) {
+      List<OmPrefixInfo> prefixList = prefixManager.getLongestPrefixPath(
+          OZONE_URI_DELIMITER +
+              keyArgs.getVolumeName() + OZONE_URI_DELIMITER +
+              keyArgs.getBucketName() + OZONE_URI_DELIMITER +
+              keyArgs.getKeyName());
+
+      if(prefixList.size() > 0) {
+        // Add all acls from direct parent to key.
+        OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1);
+        if(prefixInfo  != null) {
+          acls.addAll(OzoneUtils.getDefaultAclsProto(prefixInfo.getAcls()));
+          prefixParentFound = true;
+        }
+      }
+    }
+    if(!prefixParentFound && omBucketInfo != null) {
+      acls.addAll(OzoneUtils.getDefaultAclsProto(omBucketInfo.getAcls()));
+    }
+    builder.setAcls(acls);
     return builder.build();
   }
 
@@ -839,7 +875,7 @@ public class KeyManagerImpl implements KeyManager {
     String keyName = keyArgs.getKeyName();
 
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
-    validateS3Bucket(volumeName, bucketName);
+    OmBucketInfo bucketInfo = validateS3Bucket(volumeName, bucketName);
     try {
 
       // We are adding uploadId to key, because if multiple users try to
@@ -876,8 +912,7 @@ public class KeyManagerImpl implements KeyManager {
           .setReplicationFactor(keyArgs.getFactor())
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, locations)))
-          .setAcls(keyArgs.getAcls().stream().map(a ->
-              OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
+          .setAcls(getAclsForKey(keyArgs, null, bucketInfo))
           .build();
       DBStore store = metadataManager.getStore();
       try (BatchOperation batch = store.initBatchOperation()) {
@@ -902,6 +937,44 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
+  private List<OzoneAclInfo> getAclsForKey(OmKeyArgs keyArgs,
+      OmVolumeArgs volArgs, OmBucketInfo bucketInfo) {
+    List<OzoneAclInfo> acls = new ArrayList<>(keyArgs.getAcls().size());
+
+    keyArgs.getAcls().stream().map(OzoneAcl::toProtobuf).
+        collect(Collectors.toList());
+
+    // Inherit DEFAULT acls from prefix.
+    boolean prefixParentFound = false;
+    if(prefixManager != null) {
+      List<OmPrefixInfo> prefixList = prefixManager.getLongestPrefixPath(
+          OZONE_URI_DELIMITER +
+              keyArgs.getVolumeName() + OZONE_URI_DELIMITER +
+              keyArgs.getBucketName() + OZONE_URI_DELIMITER +
+              keyArgs.getKeyName());
+
+      if(prefixList.size() > 0) {
+        // Add all acls from direct parent to key.
+        OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1);
+        if(prefixInfo  != null) {
+          acls.addAll(OzoneUtils.getDefaultAclsProto(prefixInfo.getAcls()));
+          prefixParentFound = true;
+        }
+      }
+    }
+
+    // Inherit DEFAULT acls from bucket only if DEFAULT acls for
+    // prefix are not set.
+    if (!prefixParentFound && bucketInfo != null) {
+      acls.addAll(bucketInfo.getAcls().stream().filter(a -> a.getAclScope()
+          .equals(OzoneAcl.AclScope.DEFAULT))
+          .map(OzoneAcl::toProtobufWithAccessType)
+          .collect(Collectors.toList()));
+    }
+
+    return acls;
+  }
+
   @Override
   public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
       OmKeyArgs omKeyArgs, long clientID) throws IOException {
@@ -1159,7 +1232,7 @@ public class KeyManagerImpl implements KeyManager {
     Preconditions.checkNotNull(uploadID, "uploadID cannot be null");
     validateS3Bucket(volumeName, bucketName);
     metadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName, bucketName);
-
+    OmBucketInfo bucketInfo;
     try {
       String multipartKey = metadataManager.getMultipartKey(volumeName,
           bucketName, keyName, uploadID);
@@ -1333,6 +1406,7 @@ public class KeyManagerImpl implements KeyManager {
           newAcl = OzoneAclInfo.newBuilder()
               .setType(a.getType())
               .setName(a.getName())
+              .setAclScope(a.getAclScope())
               .setRights(ByteString.copyFrom(currentAcls.toByteArray()))
               .build();
           newAcls.remove(a);
@@ -1417,6 +1491,7 @@ public class KeyManagerImpl implements KeyManager {
             newAcl = OzoneAclInfo.newBuilder()
                 .setType(a.getType())
                 .setName(a.getName())
+                .setAclScope(a.getAclScope())
                 .setRights(ByteString.copyFrom(currentAcls.toByteArray()))
                 .build();
             newAcls.remove(a);

+ 10 - 12
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -184,6 +184,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLE
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
@@ -310,7 +311,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       throw new OMException("OM not initialized.",
           ResultCodes.OM_NOT_INITIALIZED);
     }
-
     // Load HA related configurations
     loadOMHAConfigs(configuration);
 
@@ -402,9 +402,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     omRpcAddress = updateRPCListenAddress(configuration,
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
     this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
-    keyManager = new KeyManagerImpl(scmClient, metadataManager,
-        configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
     prefixManager = new PrefixManagerImpl(metadataManager);
+    keyManager = new KeyManagerImpl(scmClient, metadataManager,
+        configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider(),
+        prefixManager);
     shutdownHook = () -> {
       saveOmMetrics();
     };
@@ -425,18 +426,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     } else {
       accessAuthorizer = null;
     }
-    ozAdmins = conf.getTrimmedStringCollection(OzoneConfigKeys
-        .OZONE_ADMINISTRATORS);
+    ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS);
     omMetaDir = OmUtils.getOmDbDir(configuration);
-
-    this.scmBlockSize = (long) conf
-        .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
-            StorageUnit.BYTES);
+    this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
+        OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
     this.preallocateBlocksMax = conf.getInt(
         OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
         OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
-    this.grpcBlockTokenEnabled = conf.getBoolean(
-        HDDS_BLOCK_TOKEN_ENABLED,
+    this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED,
         HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
     this.useRatisForReplication = conf.getBoolean(
         DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
@@ -1685,7 +1682,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
             !ozAdmins.contains(ProtobufRpcEngine.Server.getRemoteUser()
                 .getUserName())) {
           LOG.error("Only admin users are authorized to create " +
-              "Ozone volumes.");
+              "Ozone volumes. User :{} is not an admin.",
+              ProtobufRpcEngine.Server.getRemoteUser().getUserName());
           throw new OMException("Only admin users are authorized to create " +
               "Ozone volumes.", ResultCodes.PERMISSION_DENIED);
         }

+ 48 - 4
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om;
 import com.google.common.base.Strings;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
@@ -230,11 +231,45 @@ public class PrefixManagerImpl implements PrefixManager {
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
       OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-      upiBuilder.setName(prefixPath).setAcls(acls);
+      List<OzoneAcl> aclsToBeSet = new ArrayList<>(acls.size());
+      aclsToBeSet.addAll(acls);
+      upiBuilder.setName(prefixPath);
       if (prefixInfo != null && prefixInfo.getMetadata() != null) {
         upiBuilder.addAllMetadata(prefixInfo.getMetadata());
       }
-      prefixInfo = upiBuilder.build();
+
+      // Inherit DEFAULT acls from prefix.
+      boolean prefixParentFound = false;
+      List<OmPrefixInfo> prefixList = getLongestPrefixPathHelper(
+          prefixTree.getLongestPrefix(prefixPath));
+
+      if (prefixList.size() > 0) {
+        // Add all acls from direct parent to key.
+        OmPrefixInfo parentPrefixInfo = prefixList.get(prefixList.size() - 1);
+        if (parentPrefixInfo != null) {
+          aclsToBeSet.addAll(OzoneUtils.getDefaultAcls(
+              parentPrefixInfo.getAcls()));
+          prefixParentFound = true;
+        }
+      }
+
+      // If no parent prefix is found inherit DEFULT acls from bucket.
+      if (!prefixParentFound) {
+        String bucketKey = metadataManager.getBucketKey(obj.getVolumeName(),
+            obj.getBucketName());
+        OmBucketInfo bucketInfo = metadataManager.getBucketTable().
+            get(bucketKey);
+        if (bucketInfo != null) {
+          bucketInfo.getAcls().forEach(a -> {
+            if (a.getAclScope().equals(OzoneAcl.AclScope.DEFAULT)) {
+              aclsToBeSet.add(new OzoneAcl(a.getType(), a.getName(),
+                  a.getAclBitSet(), OzoneAcl.AclScope.ACCESS));
+            }
+          });
+        }
+      }
+
+      prefixInfo = upiBuilder.setAcls(aclsToBeSet).build();
       prefixTree.insert(prefixPath, prefixInfo);
       metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
     } catch (IOException ex) {
@@ -317,13 +352,22 @@ public class PrefixManagerImpl implements PrefixManager {
     String prefixPath = prefixTree.getLongestPrefix(path);
     metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
     try {
-      return prefixTree.getLongestPrefixPath(prefixPath).stream()
-          .map(c -> c.getValue()).collect(Collectors.toList());
+      return getLongestPrefixPathHelper(prefixPath);
     } finally {
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
   }
 
+  /**
+   * Get longest prefix path assuming caller take prefix lock.
+   * @param prefixPath
+   * @return list of prefix info.
+   */
+  private List<OmPrefixInfo> getLongestPrefixPathHelper(String prefixPath) {
+    return prefixTree.getLongestPrefixPath(prefixPath).stream()
+          .map(c -> c.getValue()).collect(Collectors.toList());
+  }
+
   /**
    * Helper method to validate ozone object.
    * @param obj

+ 2 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java

@@ -80,9 +80,8 @@ public class S3BucketManagerImpl implements S3BucketManager {
   @Override
   public void createS3Bucket(String userName, String bucketName)
       throws IOException {
-    Preconditions.checkArgument(
-        Strings.isNotBlank(bucketName), "Bucket name cannot be null or empty.");
-
+    Preconditions.checkArgument(Strings.isNotBlank(bucketName), "Bucket" +
+        " name cannot be null or empty.");
     Preconditions.checkArgument(Strings.isNotBlank(userName), "User name " +
         "cannot be null or empty.");
 

+ 2 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/bucket/UpdateBucketHandler.java

@@ -75,7 +75,8 @@ public class UpdateBucketHandler extends Handler {
     if (addAcl != null) {
       String[] aclArray = addAcl.split(",");
       List<OzoneAcl> aclList =
-          Arrays.stream(aclArray).map(acl -> OzoneAcl.parseAcl(acl))
+          Arrays.stream(aclArray).map(acl -> OzoneAcl.parseAcl(acl
+          ))
               .collect(Collectors.toList());
       bucket.addAcls(aclList);
     }

+ 5 - 4
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestBucketManagerImpl.java

@@ -42,6 +42,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.*;
 
 /**
@@ -220,7 +221,7 @@ public class TestBucketManagerImpl {
 
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl ozoneAcl = new OzoneAcl(ACLIdentityType.USER,
-        "root", ACLType.READ);
+        "root", ACLType.READ, ACCESS);
     acls.add(ozoneAcl);
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
@@ -238,7 +239,7 @@ public class TestBucketManagerImpl {
     Assert.assertEquals(1, result.getAcls().size());
     List<OzoneAcl> addAcls = new LinkedList<>();
     OzoneAcl newAcl = new OzoneAcl(ACLIdentityType.USER,
-        "ozone", ACLType.READ);
+        "ozone", ACLType.READ, ACCESS);
     addAcls.add(newAcl);
     OmBucketArgs bucketArgs = OmBucketArgs.newBuilder()
         .setVolumeName("sampleVol")
@@ -259,9 +260,9 @@ public class TestBucketManagerImpl {
 
     List<OzoneAcl> acls = new LinkedList<>();
     OzoneAcl aclOne = new OzoneAcl(ACLIdentityType.USER,
-        "root", ACLType.READ);
+        "root", ACLType.READ, ACCESS);
     OzoneAcl aclTwo = new OzoneAcl(ACLIdentityType.USER,
-        "ozone", ACLType.READ);
+        "ozone", ACLType.READ, ACCESS);
     acls.add(aclOne);
     acls.add(aclTwo);
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);