Browse Source

HDDS-1611. Evaluate ACL on volume bucket key and prefix to authorize access. Contributed by Ajay Kumar. (#973)

(cherry picked from commit cdb20adfcce22beb4f232f91822b190119d098ce)
Ajay Yadav 5 năm trước cách đây
mục cha
commit
84cdacbb2a
43 tập tin đã thay đổi với 1315 bổ sung185 xóa
  1. 6 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  2. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
  3. 1 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  4. 29 17
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
  5. 68 12
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
  6. 2 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  7. 2 13
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
  8. 13 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java
  9. 101 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
  10. 1 12
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  11. 6 2
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java
  12. 1 0
      hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config
  13. 4 1
      hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
  14. 1 1
      hadoop-ozone/dist/src/main/smoketest/__init__.robot
  15. 11 10
      hadoop-ozone/dist/src/main/smoketest/basic/ozone-shell.robot
  16. 3 2
      hadoop-ozone/dist/src/main/smoketest/commonlib.robot
  17. 1 1
      hadoop-ozone/dist/src/main/smoketest/createbucketenv.robot
  18. 1 1
      hadoop-ozone/dist/src/main/smoketest/createmrenv.robot
  19. 1 1
      hadoop-ozone/dist/src/main/smoketest/kinit.robot
  20. 6 6
      hadoop-ozone/dist/src/main/smoketest/ozonefs/ozonefs.robot
  21. 1 1
      hadoop-ozone/dist/src/main/smoketest/s3/awss3.robot
  22. 42 8
      hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot
  23. 3 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
  24. 3 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
  25. 6 6
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
  26. 4 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
  27. 7 6
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
  28. 464 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
  29. 3 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestVolume.java
  30. 46 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
  31. 13 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/IOzoneAcl.java
  32. 1 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
  33. 76 22
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  34. 105 36
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  35. 39 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
  36. 21 3
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java
  37. 62 4
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
  38. 2 4
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
  39. 120 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java
  40. 22 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/package-info.java
  41. 1 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/volume/ListVolumeHandler.java
  42. 15 4
      hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
  43. 1 1
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java

+ 6 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -120,6 +120,10 @@ public final class OzoneConfigKeys {
    * */
   public static final String OZONE_ADMINISTRATORS =
       "ozone.administrators";
+  /**
+   * Used only for testing purpose. Results in making every user an admin.
+   * */
+  public static final String OZONE_ADMINISTRATORS_WILDCARD = "*";
 
   public static final String OZONE_CLIENT_PROTOCOL =
       "ozone.client.protocol";
@@ -390,6 +394,8 @@ public final class OzoneConfigKeys {
       "ozone.acl.authorizer.class";
   public static final String OZONE_ACL_AUTHORIZER_CLASS_DEFAULT =
       "org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer";
+  public static final String OZONE_ACL_AUTHORIZER_CLASS_NATIVE =
+      "org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer";
   public static final String OZONE_ACL_ENABLED =
       "ozone.acl.enabled";
   public static final boolean OZONE_ACL_ENABLED_DEFAULT =

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java

@@ -219,7 +219,6 @@ public class TestDiskBalancer {
     } finally {
       cluster.shutdown();
     }
-
   }
 
   @Test

+ 1 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -892,6 +892,7 @@ public class RpcClient implements ClientProtocol {
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setMultipartUploadID(uploadID)
+        .setAcls(getAclList())
         .build();
 
     OmMultipartUploadList omMultipartUploadList = new OmMultipartUploadList(

+ 29 - 17
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java

@@ -20,8 +20,8 @@
 package org.apache.hadoop.ozone;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * OzoneACL classes define bucket ACLs used in OZONE.
@@ -46,6 +47,7 @@ public class OzoneAcl {
   private ACLIdentityType type;
   private String name;
   private BitSet aclBitSet;
+  private static final List<ACLType> EMPTY_LIST = new ArrayList<>(0);
   public static final BitSet ZERO_BITSET = new BitSet(0);
 
   /**
@@ -66,8 +68,16 @@ public class OzoneAcl {
     this.aclBitSet = new BitSet(ACLType.getNoOfAcls());
     aclBitSet.set(acl.ordinal(), true);
     this.type = type;
-    if (type == ACLIdentityType.WORLD && name.length() != 0) {
-      throw new IllegalArgumentException("Unexpected name part in world type");
+    if (type == ACLIdentityType.WORLD || type == ACLIdentityType.ANONYMOUS) {
+      if (!name.equals(ACLIdentityType.WORLD.name()) &&
+          !name.equals(ACLIdentityType.ANONYMOUS.name()) &&
+          name.length() != 0) {
+        throw new IllegalArgumentException("Unexpected name:{" + name +
+            "} for type WORLD, ANONYMOUS. It should be WORLD & " +
+            "ANONYMOUS respectively.");
+      }
+      // For type WORLD and ANONYMOUS we allow only one acl to be set.
+      this.name = type.name();
     }
     if (((type == ACLIdentityType.USER) || (type == ACLIdentityType.GROUP))
         && (name.length() == 0)) {
@@ -91,14 +101,20 @@ public class OzoneAcl {
           "size. bitset size:" + acls.cardinality() + ", bitset:"
           + acls.toString());
     }
-
     this.aclBitSet = (BitSet) acls.clone();
-    acls.stream().forEach(a -> aclBitSet.set(a));
 
     this.name = name;
     this.type = type;
-    if (type == ACLIdentityType.WORLD && name.length() != 0) {
-      throw new IllegalArgumentException("Unexpected name part in world type");
+    if (type == ACLIdentityType.WORLD || type == ACLIdentityType.ANONYMOUS) {
+      if (!name.equals(ACLIdentityType.WORLD.name()) &&
+          !name.equals(ACLIdentityType.ANONYMOUS.name()) &&
+          name.length() != 0) {
+        throw new IllegalArgumentException("Unexpected name:{" + name +
+            "} for type WORLD, ANONYMOUS. It should be WORLD & " +
+            "ANONYMOUS respectively.");
+      }
+      // For type WORLD and ANONYMOUS we allow only one acl to be set.
+      this.name = type.name();
     }
     if (((type == ACLIdentityType.USER) || (type == ACLIdentityType.GROUP))
         && (name.length() == 0)) {
@@ -161,17 +177,13 @@ public class OzoneAcl {
   public static OzoneAclInfo toProtobuf(OzoneAcl acl) {
     OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
         .setName(acl.getName())
-        .setType(OzoneAclType.valueOf(acl.getType().name()));
-    acl.getAclBitSet().stream().forEach(a ->
-        builder.addRights(OzoneAclRights.valueOf(ACLType.values()[a].name())));
+        .setType(OzoneAclType.valueOf(acl.getType().name()))
+        .setRights(ByteString.copyFrom(acl.getAclBitSet().toByteArray()));
     return builder.build();
   }
 
   public static OzoneAcl fromProtobuf(OzoneAclInfo protoAcl) {
-    BitSet aclRights = new BitSet(ACLType.getNoOfAcls());
-    protoAcl.getRightsList().parallelStream().forEach(a ->
-        aclRights.set(a.ordinal()));
-
+    BitSet aclRights = BitSet.valueOf(protoAcl.getRights().toByteArray());
     return new OzoneAcl(ACLIdentityType.valueOf(protoAcl.getType().name()),
         protoAcl.getName(), aclRights);
   }
@@ -215,11 +227,11 @@ public class OzoneAcl {
   }
 
   public List<ACLType> getAclList() {
-    List<ACLType> acls = new ArrayList<>(ACLType.getNoOfAcls());
     if(aclBitSet !=  null) {
-      aclBitSet.stream().forEach(a -> acls.add(ACLType.values()[a]));
+      return aclBitSet.stream().mapToObj(a ->
+          ACLType.values()[a]).collect(Collectors.toList());
     }
-    return acls;
+    return EMPTY_LIST;
   }
 
   /**

+ 68 - 12
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java

@@ -18,15 +18,17 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
+import com.google.protobuf.ByteString;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import java.util.BitSet;
 import java.util.List;
@@ -38,7 +40,8 @@ import java.util.Objects;
 
 import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
 
 /**
  * This helper class keeps a map of all user and their permissions.
@@ -92,7 +95,7 @@ public class OmOzoneAclMap {
         throw new OMException("Acl " + acl + " already exist.",
             INVALID_REQUEST);
       }
-      getMap(aclType).get(acl.getName()).or(acl.getAclBitSet());
+      getMap(aclType).replace(acl.getName(), temp);
     }
   }
 
@@ -143,8 +146,7 @@ public class OmOzoneAclMap {
   public void addAcl(OzoneAclInfo acl) throws OMException {
     Objects.requireNonNull(acl, "Acl should not be null.");
     if (!getMap(acl.getType()).containsKey(acl.getName())) {
-      BitSet acls = new BitSet(OzoneAclRights.values().length);
-      acl.getRightsList().parallelStream().forEach(a -> acls.set(a.ordinal()));
+      BitSet acls = BitSet.valueOf(acl.getRights().toByteArray());
       getMap(acl.getType()).put(acl.getName(), acls);
     } else {
       // throw exception if acl is already added.
@@ -163,11 +165,66 @@ public class OmOzoneAclMap {
     if (aclBitSet == null) {
       return false;
     }
+    BitSet result = BitSet.valueOf(acl.getRights().toByteArray());
+    result.and(aclBitSet);
+    return (!result.equals(ZERO_BITSET) || aclBitSet.get(ALL.ordinal()))
+        && !aclBitSet.get(NONE.ordinal());
+  }
+
+  /**
+   * For a given acl, check if the user has access rights.
+   * Acl's are checked in followoing order:
+   * 1. Acls for USER.
+   * 2. Acls for GROUPS.
+   * 3. Acls for WORLD.
+   * 4. Acls for ANONYMOUS.
+   * @param acl
+   * @param ugi
+   *
+   * @return true if given ugi has acl set, else false.
+   * */
+  public boolean hasAccess(ACLType acl, UserGroupInformation ugi) {
+    if (acl == null) {
+      return false;
+    }
+    if (ugi == null) {
+      return false;
+    }
 
-    for (OzoneAclRights right : acl.getRightsList()) {
-      if (aclBitSet.get(right.ordinal()) || aclBitSet.get(ALL.ordinal())) {
+    // Check acls in user acl list.
+    return checkAccessForOzoneAclType(OzoneAclType.USER, acl, ugi)
+        || checkAccessForOzoneAclType(OzoneAclType.GROUP, acl, ugi)
+        || checkAccessForOzoneAclType(OzoneAclType.WORLD, acl, ugi)
+        || checkAccessForOzoneAclType(OzoneAclType.ANONYMOUS, acl, ugi);
+  }
+
+  /**
+   * Helper function to check acl access for OzoneAclType.
+   * */
+  private boolean checkAccessForOzoneAclType(OzoneAclType identityType,
+      ACLType acl, UserGroupInformation ugi) {
+
+    switch (identityType) {
+    case USER:
+      return OzoneUtils.checkIfAclBitIsSet(acl, getAcl(identityType,
+          ugi.getUserName()));
+    case GROUP:
+      // Check access for user groups.
+      for (String userGroup : ugi.getGroupNames()) {
+        if (OzoneUtils.checkIfAclBitIsSet(acl, getAcl(identityType,
+            userGroup))) {
+          // Return true if any user group has required permission.
+          return true;
+        }
+      }
+      break;
+    default:
+      // For type WORLD and ANONYMOUS we set acl type as name.
+      if(OzoneUtils.checkIfAclBitIsSet(acl, getAcl(identityType,
+          identityType.name()))) {
         return true;
       }
+
     }
     return false;
   }
@@ -180,13 +237,12 @@ public class OmOzoneAclMap {
           aclMaps.get(type.ordinal()).entrySet()) {
         OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
             .setName(entry.getKey())
-            .setType(type);
-        entry.getValue().stream().forEach(a ->
-            builder.addRights(OzoneAclRights.values()[a]));
+            .setType(type)
+            .setRights(ByteString.copyFrom(entry.getValue().toByteArray()));
+
         aclList.add(builder.build());
       }
     }
-
     return aclList;
   }
 

+ 2 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -1073,6 +1073,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setVolumeName(omKeyArgs.getVolumeName())
         .setBucketName(omKeyArgs.getBucketName())
         .setKeyName(omKeyArgs.getKeyName())
+        .addAllAcls(omKeyArgs.getAcls().stream().map(a ->
+            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
         .setMultipartUploadID(omKeyArgs.getMultipartUploadID());
 
     multipartUploadCompleteRequest.setKeyArgs(keyArgs.build());

+ 2 - 13
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java

@@ -38,17 +38,12 @@ import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 
 import java.util.BitSet;
-import java.util.List;
-import java.util.ArrayList;
 
 /**
  * Utilities for converting protobuf classes.
@@ -84,14 +79,10 @@ public final class OMPBHelper {
     default:
       throw new IllegalArgumentException("ACL type is not recognized");
     }
-    List<OzoneAclRights> ozAclRights =
-        new ArrayList<>(acl.getAclBitSet().cardinality());
-    acl.getAclBitSet().stream().forEach(a -> ozAclRights.add(
-        OzoneAclRights.valueOf(ACLType.values()[a].name())));
 
     return OzoneAclInfo.newBuilder().setType(aclType)
         .setName(acl.getName())
-        .addAllRights(ozAclRights)
+        .setRights(ByteString.copyFrom(acl.getAclBitSet().toByteArray()))
         .build();
   }
 
@@ -121,9 +112,7 @@ public final class OMPBHelper {
       throw new IllegalArgumentException("ACL type is not recognized");
     }
 
-    BitSet aclRights = new BitSet(ACLType.getNoOfAcls());
-    aclInfo.getRightsList().stream().forEach(a ->
-        aclRights.set(ACLType.valueOf(a.name()).ordinal()));
+    BitSet aclRights = BitSet.valueOf(aclInfo.getRights().toByteArray());
     return new OzoneAcl(aclType, aclInfo.getName(), aclRights);
   }
 

+ 13 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java

@@ -56,11 +56,20 @@ public interface IAccessAuthorizer {
     ALL,
     NONE;
     private static int length = ACLType.values().length;
+    private static ACLType[] vals = ACLType.values();
 
     public static int getNoOfAcls() {
       return length;
     }
 
+    public static ACLType getAclTypeFromOrdinal(int ordinal) {
+      if (ordinal > length - 1 && ordinal > -1) {
+        throw new IllegalArgumentException("Ordinal greater than array lentgh" +
+            ". ordinal:" + ordinal);
+      }
+      return vals[ordinal];
+    }
+
     /**
      * Returns the ACL rights based on passed in String.
      *
@@ -145,9 +154,11 @@ public interface IAccessAuthorizer {
   enum ACLIdentityType {
     USER(OzoneConsts.OZONE_ACL_USER_TYPE),
     GROUP(OzoneConsts.OZONE_ACL_GROUP_TYPE),
-    CLIENT_IP(OzoneConsts.OZONE_ACL_IP_TYPE),
     WORLD(OzoneConsts.OZONE_ACL_WORLD_TYPE),
-    ANONYMOUS(OzoneConsts.OZONE_ACL_ANONYMOUS_TYPE);
+    ANONYMOUS(OzoneConsts.OZONE_ACL_ANONYMOUS_TYPE),
+    CLIENT_IP(OzoneConsts.OZONE_ACL_IP_TYPE);
+
+    // TODO: Add support for acl checks based on CLIENT_IP.
 
     @Override
     public String toString() {

+ 101 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java

@@ -24,6 +24,7 @@ import java.nio.charset.Charset;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.TimeZone;
@@ -37,11 +38,16 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.ratis.util.TimeDuration;
 
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
 
 /**
  * Set of Utility functions used in ozone.
@@ -260,9 +266,103 @@ public final class OzoneUtils {
     listOfAcls.add(new OzoneAcl(USER, userName, userRights));
     if(userGroups != null) {
       // Group ACLs of the User.
-      userGroups.stream().forEach((group) -> listOfAcls.add(
+      userGroups.forEach((group) -> listOfAcls.add(
           new OzoneAcl(GROUP, group, groupRights)));
     }
     return listOfAcls;
   }
+
+  /**
+   * Check if acl right requested for given RequestContext exist
+   * in provided acl list.
+   * Acl validation rules:
+   * 1. If user/group has ALL bit set than all user should have all rights.
+   * 2. If user/group has NONE bit set than user/group will not have any right.
+   * 3. For all other individual rights individual bits should be set.
+   *
+   * @param acls
+   * @param context
+   * @return return true if acl list contains right requsted in context.
+   * */
+  public static boolean checkAclRight(List<OzoneAclInfo> acls,
+      RequestContext context) throws OMException {
+    String[] userGroups = context.getClientUgi().getGroupNames();
+    String userName = context.getClientUgi().getUserName();
+    ACLType aclToCheck = context.getAclRights();
+    for (OzoneAclInfo a : acls) {
+      if(checkAccessInAcl(a, userGroups, userName, aclToCheck)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean checkAccessInAcl(OzoneAclInfo a, String[] groups,
+      String username, ACLType aclToCheck) {
+    BitSet rights = BitSet.valueOf(a.getRights().toByteArray());
+    switch (a.getType()) {
+    case USER:
+      if (a.getName().equals(username)) {
+        return checkIfAclBitIsSet(aclToCheck, rights);
+      }
+      break;
+    case GROUP:
+      for (String grp : groups) {
+         // TODO: Convert ozone acls to proto map format for efficient
+        //  acl checks.
+        if (a.getName().equals(grp)) {
+          return checkIfAclBitIsSet(aclToCheck, rights);
+        }
+      }
+      break;
+
+    default:
+      return checkIfAclBitIsSet(aclToCheck, rights);
+    }
+    return false;
+  }
+
+  /**
+   * Check if acl right requested for given RequestContext exist
+   * in provided acl list.
+   * Acl validation rules:
+   * 1. If user/group has ALL bit set than all user should have all rights.
+   * 2. If user/group has NONE bit set than user/group will not have any right.
+   * 3. For all other individual rights individual bits should be set.
+   *
+   * @param acls
+   * @param context
+   * @return return true if acl list contains right requsted in context.
+   * */
+  public static boolean checkAclRights(List<OzoneAcl> acls,
+      RequestContext context) throws OMException {
+    String[] userGroups = context.getClientUgi().getGroupNames();
+    String userName = context.getClientUgi().getUserName();
+    ACLType aclToCheck = context.getAclRights();
+    // TODO: All ozone types should use one data type for acls. i.e Store
+    //  and maintain acls in proto format only.
+    for (OzoneAcl a : acls) {
+      if (checkAccessInAcl(OzoneAcl.toProtobuf(a), userGroups,
+          userName, aclToCheck)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper function to check if bit for given acl is set.
+   * @param acl
+   * @param bitset
+   * @return True of acl bit is set else false.
+   * */
+  public static boolean checkIfAclBitIsSet(ACLType acl, BitSet bitset) {
+    if (bitset == null) {
+      return false;
+    }
+
+    return ((bitset.get(acl.ordinal())
+        || bitset.get(ALL.ordinal()))
+        && !bitset.get(NONE.ordinal()));
+  }
 }

+ 1 - 12
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -507,20 +507,9 @@ message OzoneAclInfo {
         CLIENT_IP = 5;
     }
 
-    enum OzoneAclRights {
-        READ = 1;
-        WRITE = 2;
-        CREATE = 3;
-        LIST = 4;
-        DELETE = 5;
-        READ_ACL = 6;
-        WRITE_ACL = 7;
-        ALL = 8;
-        NONE = 9;
-    }
     required OzoneAclType type = 1;
     required string name = 2;
-    repeated OzoneAclRights rights = 3;
+    required bytes rights = 3;
 }
 
 message GetAclRequest {

+ 6 - 2
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java

@@ -105,6 +105,10 @@ public class TestOzoneAcls {
 
     testMatrix.put(" world:bilbo:w", Boolean.FALSE);
     testMatrix.put(" world:bilbo:rw", Boolean.FALSE);
+    testMatrix.put(" anonymous:bilbo:w", Boolean.FALSE);
+    testMatrix.put(" anonymous:ANONYMOUS:w", Boolean.TRUE);
+    testMatrix.put(" anonymous::rw", Boolean.TRUE);
+    testMatrix.put(" world:WORLD:rw", Boolean.TRUE);
 
     Set<String> keys = testMatrix.keySet();
     for (String key : keys) {
@@ -157,7 +161,7 @@ public class TestOzoneAcls {
     assertEquals(ACLIdentityType.GROUP, acl.getType());
 
     acl = OzoneAcl.parseAcl("world::a");
-    assertEquals(acl.getName(), "");
+    assertEquals(acl.getName(), "WORLD");
     assertTrue(acl.getAclBitSet().get(ALL.ordinal()));
     assertFalse(acl.getAclBitSet().get(WRITE.ordinal()));
     assertEquals(ACLIdentityType.WORLD, acl.getType());
@@ -188,7 +192,7 @@ public class TestOzoneAcls {
     assertEquals(ACLIdentityType.GROUP, acl.getType());
 
     acl = OzoneAcl.parseAcl("world::rwdlncxy");
-    assertEquals(acl.getName(), "");
+    assertEquals(acl.getName(), "WORLD");
     assertTrue(acl.getAclBitSet().get(READ.ordinal()));
     assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
     assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));

+ 1 - 0
hadoop-ozone/dist/src/main/compose/ozonesecure-mr/docker-config

@@ -31,6 +31,7 @@ OZONE-SITE.XML_ozone.om.kerberos.principal=om/om@EXAMPLE.COM
 OZONE-SITE.XML_ozone.om.kerberos.keytab.file=/etc/security/keytabs/om.keytab
 OZONE-SITE.XML_ozone.s3g.keytab.file=/etc/security/keytabs/HTTP.keytab
 OZONE-SITE.XML_ozone.s3g.authentication.kerberos.principal=HTTP/s3g@EXAMPLE.COM
+OZONE_SITE.XML_ozone.administrators=*
 
 OZONE-SITE.XML_ozone.security.enabled=true
 OZONE-SITE.XML_hdds.scm.http.kerberos.principal=HTTP/scm@EXAMPLE.COM

+ 4 - 1
hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config

@@ -35,6 +35,9 @@ OZONE-SITE.XML_ozone.s3g.keytab.file=/etc/security/keytabs/HTTP.keytab
 OZONE-SITE.XML_ozone.s3g.authentication.kerberos.principal=HTTP/s3g@EXAMPLE.COM
 
 OZONE-SITE.XML_ozone.security.enabled=true
+OZONE-SITE.XML_ozone.acl.enabled=true
+OZONE-SITE.XML_ozone.acl.authorizer.class=org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer
+OZONE-SITE.XML_ozone.administrators=*
 OZONE-SITE.XML_hdds.scm.http.kerberos.principal=HTTP/scm@EXAMPLE.COM
 OZONE-SITE.XML_hdds.scm.http.kerberos.keytab.file=/etc/security/keytabs/HTTP.keytab
 OZONE-SITE.XML_ozone.om.http.kerberos.principal=HTTP/om@EXAMPLE.COM
@@ -112,7 +115,7 @@ LOG4J2.PROPERTIES_rootLogger.appenderRef.stdout.ref=STDOUT
 OZONE_DATANODE_SECURE_USER=root
 SECURITY_ENABLED=true
 KEYTAB_DIR=/etc/security/keytabs
-KERBEROS_KEYTABS=dn om scm HTTP testuser s3g
+KERBEROS_KEYTABS=dn om scm HTTP testuser testuser2 s3g
 KERBEROS_KEYSTORES=hadoop
 KERBEROS_SERVER=kdc
 JAVA_HOME=/usr/lib/jvm/jre

+ 1 - 1
hadoop-ozone/dist/src/main/smoketest/__init__.robot

@@ -15,4 +15,4 @@
 *** Settings ***
 Documentation       Smoketest ozone secure cluster
 Resource            commonlib.robot
-Suite Setup         Run Keyword if    '${SECURITY_ENABLED}' == 'true'     Kinit test user
+Suite Setup         Run Keyword if    '${SECURITY_ENABLED}' == 'true'    Kinit test user     testuser     testuser.keytab

+ 11 - 10
hadoop-ozone/dist/src/main/smoketest/basic/ozone-shell.robot

@@ -44,16 +44,17 @@ RpcClient without scheme
 *** Keywords ***
 Test ozone shell
     [arguments]     ${protocol}         ${server}       ${volume}
-    ${result} =     Execute             ozone sh volume create ${protocol}${server}/${volume} --user bilbo --quota 100TB --root
+    ${result} =     Execute             ozone sh volume create ${protocol}${server}/${volume} --quota 100TB
                     Should not contain  ${result}       Failed
                     Should contain      ${result}       Creating Volume: ${volume}
-    ${result} =     Execute             ozone sh volume list ${protocol}${server}/ --user bilbo | grep -Ev 'Removed|WARN|DEBUG|ERROR|INFO|TRACE' | jq -r '.[] | select(.volumeName=="${volume}")'
+    ${result} =     Execute             ozone sh volume list ${protocol}${server}/ | grep -Ev 'Removed|WARN|DEBUG|ERROR|INFO|TRACE' | jq -r '.[] | select(.volumeName=="${volume}")'
                     Should contain      ${result}       createdOn
-    ${result} =     Execute             ozone sh volume list --user bilbo | grep -Ev 'Removed|DEBUG|ERROR|INFO|TRACE|WARN' | jq -r '.[] | select(.volumeName=="${volume}")'
+    ${result} =     Execute             ozone sh volume list | grep -Ev 'Removed|DEBUG|ERROR|INFO|TRACE|WARN' | jq -r '.[] | select(.volumeName=="${volume}")'
                     Should contain      ${result}       createdOn
-                    Execute             ozone sh volume update ${protocol}${server}/${volume} --user bill --quota 10TB
-    ${result} =     Execute             ozone sh volume info ${protocol}${server}/${volume} | grep -Ev 'Removed|WARN|DEBUG|ERROR|INFO|TRACE' | jq -r '. | select(.volumeName=="${volume}") | .owner | .name'
-                    Should Be Equal     ${result}       bill
+# TODO: Disable updating the owner, acls should be used to give access to other user.        
+                    Execute             ozone sh volume update ${protocol}${server}/${volume} --quota 10TB
+#    ${result} =     Execute             ozone sh volume info ${protocol}${server}/${volume} | grep -Ev 'Removed|WARN|DEBUG|ERROR|INFO|TRACE' | jq -r '. | select(.volumeName=="${volume}") | .owner | .name'
+#                    Should Be Equal     ${result}       bill
     ${result} =     Execute             ozone sh volume info ${protocol}${server}/${volume} | grep -Ev 'Removed|WARN|DEBUG|ERROR|INFO|TRACE' | jq -r '. | select(.volumeName=="${volume}") | .quota | .size'
                     Should Be Equal     ${result}       10
                     Execute             ozone sh bucket create ${protocol}${server}/${volume}/bb1
@@ -67,7 +68,7 @@ Test ozone shell
                     Should Be Equal     ${result}       ${volume}
                     Run Keyword         Test key handling       ${protocol}       ${server}       ${volume}
                     Execute             ozone sh bucket delete ${protocol}${server}/${volume}/bb1
-                    Execute             ozone sh volume delete ${protocol}${server}/${volume} --user bilbo
+                    Execute             ozone sh volume delete ${protocol}${server}/${volume}
 
 Test Volume Acls
     [arguments]     ${protocol}         ${server}       ${volume}
@@ -80,7 +81,7 @@ Test Volume Acls
     ${result} =     Execute             ozone sh volume removeacl ${protocol}${server}/${volume} -a user:superuser1:xy
     ${result} =     Execute             ozone sh volume getacl ${protocol}${server}/${volume}
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh volume setacl ${protocol}${server}/${volume} -al user:superuser1:rwxy,group:superuser1:a
+    ${result} =     Execute             ozone sh volume setacl ${protocol}${server}/${volume} -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh volume getacl ${protocol}${server}/${volume}
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
@@ -96,7 +97,7 @@ Test Bucket Acls
     ${result} =     Execute             ozone sh bucket removeacl ${protocol}${server}/${volume}/bb1 -a user:superuser1:xy
     ${result} =     Execute             ozone sh bucket getacl ${protocol}${server}/${volume}/bb1
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh bucket setacl ${protocol}${server}/${volume}/bb1 -al user:superuser1:rwxy,group:superuser1:a
+    ${result} =     Execute             ozone sh bucket setacl ${protocol}${server}/${volume}/bb1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh bucket getacl ${protocol}${server}/${volume}/bb1
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
@@ -128,7 +129,7 @@ Test key Acls
     ${result} =     Execute             ozone sh key removeacl ${protocol}${server}/${volume}/bb1/key2 -a user:superuser1:xy
     ${result} =     Execute             ozone sh key getacl ${protocol}${server}/${volume}/bb1/key2
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh key setacl ${protocol}${server}/${volume}/bb1/key2 -al user:superuser1:rwxy,group:superuser1:a
+    ${result} =     Execute             ozone sh key setacl ${protocol}${server}/${volume}/bb1/key2 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh key getacl ${protocol}${server}/${volume}/bb1/key2
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"

+ 3 - 2
hadoop-ozone/dist/src/main/smoketest/commonlib.robot

@@ -55,6 +55,7 @@ Install aws cli
     Run Keyword if     '${rc}' == '0'              Install aws cli s3 centos
 
 Kinit test user
+    [arguments]                      ${user}       ${keytab}
     ${hostname} =       Execute                    hostname
-    Set Suite Variable  ${TEST_USER}               testuser/${hostname}@EXAMPLE.COM
-    Wait Until Keyword Succeeds      2min       10sec      Execute            kinit -k ${TEST_USER} -t /etc/security/keytabs/testuser.keytab
+    Set Suite Variable  ${TEST_USER}               ${user}/${hostname}@EXAMPLE.COM
+    Wait Until Keyword Succeeds      2min       10sec      Execute            kinit -k ${user}/${hostname}@EXAMPLE.COM -t /etc/security/keytabs/${keytab}

+ 1 - 1
hadoop-ozone/dist/src/main/smoketest/createbucketenv.robot

@@ -27,7 +27,7 @@ ${bucket}       bucket1
 
 *** Keywords ***
 Create volume
-    ${result} =     Execute             ozone sh volume create /${volume} --user hadoop --quota 100TB --root
+    ${result} =     Execute             ozone sh volume create /${volume} --user hadoop --quota 100TB
                     Should not contain  ${result}       Failed
                     Should contain      ${result}       Creating Volume: ${volume}
 Create bucket

+ 1 - 1
hadoop-ozone/dist/src/main/smoketest/createmrenv.robot

@@ -27,7 +27,7 @@ ${bucket}       bucket1
 
 *** Keywords ***
 Create volume
-    ${result} =     Execute             ozone sh volume create /${volume} --user hadoop --quota 100TB --root
+    ${result} =     Execute             ozone sh volume create /${volume} --user hadoop --quota 100TB
                     Should not contain  ${result}       Failed
                     Should contain      ${result}       Creating Volume: ${volume}
 Create bucket

+ 1 - 1
hadoop-ozone/dist/src/main/smoketest/kinit.robot

@@ -22,4 +22,4 @@ Test Timeout        2 minute
 
 *** Test Cases ***
 Kinit
-   Kinit test user
+   Kinit test user     testuser     testuser.keytab

+ 6 - 6
hadoop-ozone/dist/src/main/smoketest/ozonefs/ozonefs.robot

@@ -23,11 +23,11 @@ Resource            ../commonlib.robot
 
 *** Test Cases ***
 Create volume and bucket
-    Execute             ozone sh volume create http://om/fstest --user bilbo --quota 100TB --root
-    Execute             ozone sh volume create http://om/fstest2 --user bilbo --quota 100TB --root
-    Execute             ozone sh bucket create http://om/fstest/bucket1
-    Execute             ozone sh bucket create http://om/fstest/bucket2
-    Execute             ozone sh bucket create http://om/fstest2/bucket3
+    Execute             ozone sh volume create o3://om/fstest --quota 100TB
+    Execute             ozone sh volume create o3://om/fstest2 --quota 100TB
+    Execute             ozone sh bucket create o3://om/fstest/bucket1
+    Execute             ozone sh bucket create o3://om/fstest/bucket2
+    Execute             ozone sh bucket create o3://om/fstest2/bucket3
 
 Check volume from ozonefs
     ${result} =         Execute               ozone fs -ls o3fs://bucket1.fstest/
@@ -109,4 +109,4 @@ Run ozoneFS tests
                         Execute               ls -l GET.txt
     ${rc}  ${result} =  Run And Return Rc And Output        ozone fs -ls o3fs://abcde.pqrs/
                         Should Be Equal As Integers     ${rc}                1
-                        Should contain    ${result}         not found
+                        Should Match Regexp    ${result}         (Check access operation failed)|(Volume pqrs is not found)

+ 1 - 1
hadoop-ozone/dist/src/main/smoketest/s3/awss3.robot

@@ -41,7 +41,7 @@ File upload and directory list
                         Should not contain        ${result}         testfile
                         Should not contain        ${result}         dir1
                         Should contain            ${result}         dir2
-    ${result} =         Execute AWSS3Cli          ls s3://${BUCKET}/dir1/dir2/
+    ${result} =         Execute AWSS3Cli          ls s3://${BUCKET}/dir1/dir2/file
                         Should not contain        ${result}         testfile
                         Should not contain        ${result}         dir1
                         Should contain            ${result}         file

+ 42 - 8
hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-fs.robot

@@ -33,15 +33,15 @@ Setup volume names
 *** Test Cases ***
 Create volume bucket with wrong credentials
     Execute             kdestroy
-    ${rc}               ${output} =          Run And Return Rc And Output       ozone sh volume create o3://om/fstest --user bilbo --quota 100TB --root
+    ${rc}               ${output} =          Run And Return Rc And Output       ozone sh volume create o3://om/fstest
     Should contain      ${output}       Client cannot authenticate via
 
 Create volume bucket with credentials
                         # Authenticate testuser
-    Run Keyword         Kinit test user
+    Run Keyword         Kinit test user     testuser     testuser.keytab
     Run Keyword         Setup volume names
-    Execute             ozone sh volume create o3://om/${volume1} --user bilbo --quota 100TB --root
-    Execute             ozone sh volume create o3://om/${volume2} --user bilbo --quota 100TB --root
+    Execute             ozone sh volume create o3://om/${volume1} 
+    Execute             ozone sh volume create o3://om/${volume2}
     Execute             ozone sh bucket create o3://om/${volume1}/bucket1
     Execute             ozone sh bucket create o3://om/${volume1}/bucket2
     Execute             ozone sh bucket create o3://om/${volume2}/bucket3
@@ -60,7 +60,7 @@ Test Volume Acls
     ${result} =     Execute             ozone sh volume removeacl ${volume3} -a user:superuser1:xy
     ${result} =     Execute             ozone sh volume getacl ${volume3}
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh volume setacl ${volume3} -al user:superuser1:rwxy,group:superuser1:a
+    ${result} =     Execute             ozone sh volume setacl ${volume3} -al user:superuser1:rwxy,user:testuser/scm@EXAMPLE.COM:rwxyc,group:superuser1:a
     ${result} =     Execute             ozone sh volume getacl ${volume3}
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
@@ -76,7 +76,7 @@ Test Bucket Acls
     ${result} =     Execute             ozone sh bucket removeacl ${volume3}/bk1 -a user:superuser1:xy
     ${result} =     Execute             ozone sh bucket getacl ${volume3}/bk1
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh bucket setacl ${volume3}/bk1 -al user:superuser1:rwxy,group:superuser1:a
+    ${result} =     Execute             ozone sh bucket setacl ${volume3}/bk1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh bucket getacl ${volume3}/bk1
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
     Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
@@ -91,7 +91,41 @@ Test key Acls
     ${result} =     Execute             ozone sh key removeacl ${volume3}/bk1/key1 -a user:superuser1:xy
     ${result} =     Execute             ozone sh key getacl ${volume3}/bk1/key1
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"READ\", \"WRITE\"
-    ${result} =     Execute             ozone sh key setacl ${volume3}/bk1/key1 -al user:superuser1:rwxy,group:superuser1:a
+    ${result} =     Execute             ozone sh key setacl ${volume3}/bk1/key1 -al user:superuser1:rwxy,group:superuser1:a,user:testuser/scm@EXAMPLE.COM:rwxyc
     ${result} =     Execute             ozone sh key getacl ${volume3}/bk1/key1
     Should Match Regexp                 ${result}       \"type\" : \"USER\",\n.*\"name\" : \"superuser1*\",\n.*\"aclList\" : . \"READ\", \"WRITE\", \"READ_ACL\", \"WRITE_ACL\"
-    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+    Should Match Regexp                 ${result}       \"type\" : \"GROUP\",\n.*\"name\" : \"superuser1\",\n.*\"aclList\" : . \"ALL\"
+
+Test native authorizer
+    Execute         ozone sh volume removeacl ${volume3} -a group:root:a
+    Execute         kdestroy
+    Run Keyword     Kinit test user     testuser2    testuser2.keytab
+    ${result} =     Execute And Ignore Error         ozone sh bucket list /${volume3}/    
+                    Should contain      ${result}    PERMISSION_DENIED
+    ${result} =     Execute And Ignore Error         ozone sh key list /${volume3}/bk1      
+                    Should contain      ${result}    PERMISSION_DENIED
+    ${result} =     Execute And Ignore Error         ozone sh volume addacl ${volume3} -a user:testuser2/scm@EXAMPLE.COM:xy
+                    Should contain      ${result}    PERMISSION_DENIED User testuser2/scm@EXAMPLE.COM doesn't have WRITE_ACL permission to access volume
+    Execute         kdestroy
+    Run Keyword     Kinit test user     testuser     testuser.keytab
+    Execute         ozone sh volume addacl ${volume3} -a user:testuser2/scm@EXAMPLE.COM:xyrw
+    Execute         kdestroy
+    Run Keyword     Kinit test user     testuser2    testuser2.keytab
+    ${result} =     Execute And Ignore Error         ozone sh bucket list /${volume3}/
+                    Should contain      ${result}    PERMISSION_DENIED org.apache.hadoop.ozone.om.exceptions.OMException: User testuser2/scm@EXAMPLE.COM doesn't have LIST permission to access volume
+    Execute         ozone sh volume addacl ${volume3} -a user:testuser2/scm@EXAMPLE.COM:l
+    Execute         ozone sh bucket list /${volume3}/
+    Execute         ozone sh volume getacl /${volume3}/
+    
+    ${result} =     Execute And Ignore Error         ozone sh key list /${volume3}/bk1  
+    Should contain      ${result}    PERMISSION_DENIED
+    Execute         kdestroy
+    Run Keyword     Kinit test user     testuser     testuser.keytab
+    Execute         ozone sh bucket addacl ${volume3}/bk1 -a user:testuser2/scm@EXAMPLE.COM:a
+    Execute         ozone sh bucket getacl /${volume3}/bk1
+    Execute         kdestroy
+    Run Keyword     Kinit test user     testuser2    testuser2.keytab
+    Execute         ozone sh bucket getacl /${volume3}/bk1
+    Execute         ozone sh key list /${volume3}/bk1
+    Execute         kdestroy
+    Run Keyword     Kinit test user     testuser    testuser.keytab

+ 3 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
 import org.apache.hadoop.ozone.s3.S3GatewayConfigKeys;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+
 /**
  * Tests if configuration constants documented in ozone-defaults.xml.
  */
@@ -49,6 +51,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_SECURITY_PROVIDER);
     configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT);
     configurationPropsToSkipCompare.add(OMConfigKeys.OZONE_OM_NODES_KEY);
+    configurationPropsToSkipCompare.add(OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
     configurationPropsToSkipCompare.add(OzoneConfigKeys.
         OZONE_S3_TOKEN_MAX_LIFETIME_KEY);
   }

+ 3 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -292,7 +292,7 @@ public abstract class TestOzoneRpcClientAbstract {
   public void testCreateS3Bucket()
       throws IOException, OzoneException {
     long currentTime = Time.now();
-    String userName = "ozone";
+    String userName = UserGroupInformation.getCurrentUser().getUserName();
     String bucketName = UUID.randomUUID().toString();
     store.createS3Bucket(userName, bucketName);
     String volumeName = store.getOzoneVolumeName(bucketName);
@@ -2319,7 +2319,8 @@ public abstract class TestOzoneRpcClientAbstract {
       assertTrue(acls.size() == expectedAcls.size());
       for(OzoneAcl acl: acls) {
         if(acl.getName().equals(newAcl.getName())) {
-          assertFalse(acl.getAclList().contains(ACLType.READ_ACL));
+          assertFalse("READ_ACL should not exist in current acls:" +
+              acls, acl.getAclList().contains(ACLType.READ_ACL));
           aclVerified = true;
         }
       }

+ 6 - 6
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java

@@ -124,8 +124,8 @@ public class TestOmAcls {
     logCapturer.clearOutput();
     OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
         () -> storageHandler.createVolume(createVolumeArgs));
-    assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
-        "permission to access volume"));
+    assertTrue(logCapturer.getOutput().contains("Only admin users are " +
+        "authorized to create Ozone"));
 
     BucketArgs bucketArgs = new BucketArgs("bucket1", createVolumeArgs);
     bucketArgs.setAddAcls(new LinkedList<>());
@@ -133,8 +133,8 @@ public class TestOmAcls {
     bucketArgs.setStorageType(StorageType.DISK);
     OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
         () -> storageHandler.createBucket(bucketArgs));
-    assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
-        "permission to access bucket"));
+    assertTrue(logCapturer.getOutput().contains("Only admin users are" +
+        " authorized to create Ozone"));
   }
 
   @Test
@@ -157,8 +157,8 @@ public class TestOmAcls {
     KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
     OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
         () -> storageHandler.newKeyWriter(keyArgs));
-    assertTrue(logCapturer.getOutput().contains("doesn't have READ permission" +
-        " to access key"));
+    assertTrue(logCapturer.getOutput().contains("doesn't have WRITE " +
+        "permission to access key"));
   }
 }
 

+ 4 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -94,8 +95,9 @@ public class TestOmBlockVersioning {
 
   @Test
   public void testAllocateCommit() throws Exception {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String userName = ugi.getUserName();
+    String adminName = ugi.getUserName();
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
     String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
     String keyName = "key" + RandomStringUtils.randomNumeric(5);

+ 7 - 6
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java

@@ -89,6 +89,8 @@ import org.apache.hadoop.utils.db.TableIterator;
 import org.apache.commons.lang3.RandomStringUtils;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
@@ -135,6 +137,7 @@ public class TestOzoneManager {
     omId = UUID.randomUUID().toString();
     conf.setBoolean(OZONE_ACL_ENABLED, true);
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+    conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD);
     cluster =  MiniOzoneCluster.newBuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
@@ -356,7 +359,7 @@ public class TestOzoneManager {
   // Create a volume and test Volume access for a different user
   @Test
   public void testAccessVolume() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String userName = UserGroupInformation.getCurrentUser().getUserName();
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
     String[] groupName =
@@ -1012,8 +1015,8 @@ public class TestOzoneManager {
 
   @Test
   public void testListVolumes() throws IOException, OzoneException {
-
-    String user0 = "testListVolumes-user-0";
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String user0 = ugi.getUserName();
     String user1 = "testListVolumes-user-1";
     String adminUser = "testListVolumes-admin";
     ListArgs listVolumeArgs;
@@ -1072,9 +1075,7 @@ public class TestOzoneManager {
     listVolumeArgs = new ListArgs(userArgs1, null, 100, null);
     listVolumeArgs.setRootScan(false);
     volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(10, volumes.getVolumes().size());
-    Assert.assertEquals(user1,
-        volumes.getVolumes().get(3).getOwner().getName());
+    Assert.assertEquals(0, volumes.getVolumes().size());
 
     // Make sure all available fields are returned
     final String user0vol4 = "Vol-" + user0 + "-4";

+ 464 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java

@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security.acl;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.BucketManagerImpl;
+import org.apache.hadoop.ozone.om.IOzoneAcl;
+import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManager;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.VolumeManagerImpl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.ANONYMOUS;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.WORLD;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.PREFIX;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test class for {@link OzoneNativeAuthorizer}.
+ */
+@RunWith(Parameterized.class)
+public class TestOzoneNativeAuthorizer {
+
+  private static OzoneConfiguration ozConfig;
+  private String vol;
+  private String buck;
+  private String key;
+  private String prefix;
+  private ACLType parentDirUserAcl;
+  private ACLType parentDirGroupAcl;
+  private boolean expectedAclResult;
+
+  private static KeyManagerImpl keyManager;
+  private static VolumeManagerImpl volumeManager;
+  private static BucketManagerImpl bucketManager;
+  private static PrefixManager prefixManager;
+  private static OMMetadataManager metadataManager;
+  private static OzoneNativeAuthorizer nativeAuthorizer;
+
+  private static StorageContainerManager scm;
+  private static UserGroupInformation ugi;
+
+  private static OzoneObj volObj;
+  private static OzoneObj buckObj;
+  private static OzoneObj keyObj;
+  private static OzoneObj prefixObj;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {"key", "dir1/", ALL, ALL, true},
+        {"file1", "2019/june/01/", ALL, ALL, true},
+        {"file2", "", ALL, ALL, true},
+        {"dir1/dir2/dir4/", "", ALL, ALL, true},
+        {"key", "dir1/", NONE, NONE, false},
+        {"file1", "2019/june/01/", NONE, NONE, false},
+        {"file2", "", NONE, NONE, false},
+        {"dir1/dir2/dir4/", "", NONE, NONE, false}
+    });
+  }
+
+  public TestOzoneNativeAuthorizer(String keyName, String prefixName,
+      ACLType userRight,
+      ACLType groupRight, boolean expectedResult) throws IOException {
+    int randomInt = RandomUtils.nextInt();
+    vol = "vol" + randomInt;
+    buck = "bucket" + randomInt;
+    key = keyName + randomInt;
+    prefix = prefixName + randomInt + OZONE_URI_DELIMITER;
+    parentDirUserAcl = userRight;
+    parentDirGroupAcl = groupRight;
+    expectedAclResult = expectedResult;
+
+    createVolume(vol);
+    createBucket(vol, buck);
+    createKey(vol, buck, key);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ozConfig = new OzoneConfiguration();
+    ozConfig.set(OZONE_ACL_AUTHORIZER_CLASS,
+        OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
+    File dir = GenericTestUtils.getRandomizedTestDir();
+    ozConfig.set(OZONE_METADATA_DIRS, dir.toString());
+    ozConfig.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD);
+
+    metadataManager = new OmMetadataManagerImpl(ozConfig);
+    volumeManager = new VolumeManagerImpl(metadataManager, ozConfig);
+    bucketManager = new BucketManagerImpl(metadataManager);
+    prefixManager = new PrefixManagerImpl(metadataManager);
+
+    NodeManager nodeManager = new MockNodeManager(true, 10);
+    SCMConfigurator configurator = new SCMConfigurator();
+    configurator.setScmNodeManager(nodeManager);
+    scm = TestUtils.getScm(ozConfig, configurator);
+    scm.start();
+    scm.exitSafeMode();
+    keyManager =
+        new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager,
+            ozConfig,
+            "om1", null);
+
+    nativeAuthorizer = new OzoneNativeAuthorizer(volumeManager, bucketManager,
+        keyManager, prefixManager);
+    //keySession.
+    ugi = UserGroupInformation.getCurrentUser();
+  }
+
+  private void createKey(String volume,
+      String bucket, String keyName) throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volume)
+        .setBucketName(bucket)
+        .setKeyName(keyName)
+        .setFactor(HddsProtos.ReplicationFactor.ONE)
+        .setDataSize(0)
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+            ALL, ALL))
+        .build();
+
+    if (keyName.split(OZONE_URI_DELIMITER).length > 1) {
+      keyManager.createDirectory(keyArgs);
+      key = key + OZONE_URI_DELIMITER;
+    } else {
+      OpenKeySession keySession = keyManager.createFile(keyArgs, true, false);
+      keyArgs.setLocationInfoList(
+          keySession.getKeyInfo().getLatestVersionLocations()
+              .getLocationList());
+      keyManager.commitKey(keyArgs, keySession.getId());
+    }
+
+    keyObj = new OzoneObjInfo.Builder()
+        .setVolumeName(vol)
+        .setBucketName(buck)
+        .setKeyName(key)
+        .setResType(KEY)
+        .setStoreType(OZONE)
+        .build();
+  }
+
+  private void createBucket(String volumeName, String bucketName)
+      throws IOException {
+    OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .build();
+    bucketManager.createBucket(bucketInfo);
+    buckObj = new OzoneObjInfo.Builder()
+        .setVolumeName(vol)
+        .setBucketName(buck)
+        .setResType(BUCKET)
+        .setStoreType(OZONE)
+        .build();
+  }
+
+  private void createVolume(String volumeName) throws IOException {
+    OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+        .setVolume(volumeName)
+        .setAdminName("bilbo")
+        .setOwnerName("bilbo")
+        .build();
+    volumeManager.createVolume(volumeArgs);
+    volObj = new OzoneObjInfo.Builder()
+        .setVolumeName(vol)
+        .setResType(VOLUME)
+        .setStoreType(OZONE)
+        .build();
+  }
+
+  @Test
+  public void testCheckAccessForVolume() throws Exception {
+    expectedAclResult = true;
+    resetAclsAndValidateAccess(volObj, USER, volumeManager);
+    resetAclsAndValidateAccess(volObj, GROUP, volumeManager);
+    resetAclsAndValidateAccess(volObj, WORLD, volumeManager);
+    resetAclsAndValidateAccess(volObj, ANONYMOUS, volumeManager);
+  }
+
+  @Test
+  public void testCheckAccessForBucket() throws Exception {
+
+    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl);
+    OzoneAcl groupAcl = new OzoneAcl(GROUP, ugi.getGroups().size() > 0 ?
+        ugi.getGroups().get(0) : "", parentDirGroupAcl);
+    // Set access for volume.
+    volumeManager.setAcl(volObj, Arrays.asList(userAcl, groupAcl));
+
+    resetAclsAndValidateAccess(buckObj, USER, bucketManager);
+    resetAclsAndValidateAccess(buckObj, GROUP, bucketManager);
+    resetAclsAndValidateAccess(buckObj, WORLD, bucketManager);
+    resetAclsAndValidateAccess(buckObj, ANONYMOUS, bucketManager);
+  }
+
+  @Test
+  public void testCheckAccessForKey() throws Exception {
+    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl);
+    OzoneAcl groupAcl = new OzoneAcl(GROUP, ugi.getGroups().size() > 0 ?
+        ugi.getGroups().get(0) : "", parentDirGroupAcl);
+    // Set access for volume, bucket & prefix.
+    volumeManager.setAcl(volObj, Arrays.asList(userAcl, groupAcl));
+    bucketManager.setAcl(buckObj, Arrays.asList(userAcl, groupAcl));
+    //prefixManager.setAcl(prefixObj, Arrays.asList(userAcl, groupAcl));
+
+    resetAclsAndValidateAccess(keyObj, USER, keyManager);
+    resetAclsAndValidateAccess(keyObj, GROUP, keyManager);
+    resetAclsAndValidateAccess(keyObj, WORLD, keyManager);
+    resetAclsAndValidateAccess(keyObj, ANONYMOUS, keyManager);
+  }
+
+  @Test
+  public void testCheckAccessForPrefix() throws Exception {
+    prefixObj = new OzoneObjInfo.Builder()
+        .setVolumeName(vol)
+        .setBucketName(buck)
+        .setPrefixName(prefix)
+        .setResType(PREFIX)
+        .setStoreType(OZONE)
+        .build();
+
+    OzoneAcl userAcl = new OzoneAcl(USER, ugi.getUserName(), parentDirUserAcl);
+    OzoneAcl groupAcl = new OzoneAcl(GROUP, ugi.getGroups().size() > 0 ?
+        ugi.getGroups().get(0) : "", parentDirGroupAcl);
+    // Set access for volume & bucket.
+    volumeManager.setAcl(volObj, Arrays.asList(userAcl, groupAcl));
+    bucketManager.setAcl(buckObj, Arrays.asList(userAcl, groupAcl));
+
+    resetAclsAndValidateAccess(prefixObj, USER, prefixManager);
+    resetAclsAndValidateAccess(prefixObj, GROUP, prefixManager);
+    resetAclsAndValidateAccess(prefixObj, WORLD, prefixManager);
+    resetAclsAndValidateAccess(prefixObj, ANONYMOUS, prefixManager);
+  }
+
+  private void resetAclsAndValidateAccess(OzoneObj obj,
+      ACLIdentityType accessType, IOzoneAcl aclImplementor)
+      throws IOException {
+
+    List<OzoneAcl> acls;
+    String user = "";
+    String group = "";
+
+    user = ugi.getUserName();
+    if (ugi.getGroups().size() > 0) {
+      group = ugi.getGroups().get(0);
+    }
+
+    RequestContext.Builder builder = new RequestContext.Builder()
+        .setClientUgi(ugi)
+        .setAclType(accessType);
+
+    // Get all acls.
+    List<ACLType> allAcls = Arrays.stream(ACLType.values()).
+        collect(Collectors.toList());
+
+    /**
+     * 1. Reset default acls to an acl.
+     * 2. Test if user/group has access only to it.
+     * 3. Add remaining acls one by one and then test
+     *    if user/group has access to them.
+     * */
+    for (ACLType a1 : allAcls) {
+      OzoneAcl newAcl = new OzoneAcl(accessType, getAclName(accessType), a1);
+
+      // Reset acls to only one right.
+      aclImplementor.setAcl(obj, Arrays.asList(newAcl));
+
+      // Fetch current acls and validate.
+      acls = aclImplementor.getAcl(obj);
+      assertTrue(acls.size() == 1);
+      assertTrue(acls.contains(newAcl));
+
+      // Special handling for ALL.
+      if (a1.equals(ALL)) {
+        validateAll(obj, builder);
+        continue;
+      }
+
+      // Special handling for NONE.
+      if (a1.equals(NONE)) {
+        validateNone(obj, builder);
+        continue;
+      }
+      assertEquals("Acl to check:" + a1 + " accessType:" +
+              accessType + " path:" + obj.getPath(),
+          expectedAclResult, nativeAuthorizer.checkAccess(obj,
+              builder.setAclRights(a1).build()));
+
+      List<ACLType> aclsToBeValidated =
+          Arrays.stream(ACLType.values()).collect(Collectors.toList());
+      List<ACLType> aclsToBeAdded =
+          Arrays.stream(ACLType.values()).collect(Collectors.toList());
+      aclsToBeValidated.remove(NONE);
+      aclsToBeValidated.remove(a1);
+
+      aclsToBeAdded.remove(NONE);
+      aclsToBeAdded.remove(ALL);
+
+      // Fetch acls again.
+      for (ACLType a2 : aclsToBeAdded) {
+        if (!a2.equals(a1)) {
+
+          acls = aclImplementor.getAcl(obj);
+          List right = acls.stream().map(a -> a.getAclList()).collect(
+              Collectors.toList());
+          assertFalse("Do not expected client to have " + a2 + " acl. " +
+                  "Current acls found:" + right + ". Type:" + accessType + ","
+                  + " name:" + (accessType == USER ? user : group),
+              nativeAuthorizer.checkAccess(obj,
+                  builder.setAclRights(a2).build()));
+
+          // Randomize next type.
+          int type = RandomUtils.nextInt(0, 3);
+          ACLIdentityType identityType = ACLIdentityType.values()[type];
+          // Add remaining acls one by one and then check access.
+          OzoneAcl addAcl = new OzoneAcl(identityType, 
+              getAclName(identityType), a2);
+          aclImplementor.addAcl(obj, addAcl);
+
+          // Fetch acls again.
+          acls = aclImplementor.getAcl(obj);
+          boolean a2AclFound = false;
+          boolean a1AclFound = false;
+          for (OzoneAcl acl : acls) {
+            if (acl.getAclList().contains(a2)) {
+              a2AclFound = true;
+            }
+            if (acl.getAclList().contains(a1)) {
+              a1AclFound = true;
+            }
+          }
+
+          assertTrue("Current acls :" + acls + ". " +
+              "Type:" + accessType + ", name:" + (accessType == USER ? user
+              : group) + " acl:" + a2, a2AclFound);
+          assertTrue("Expected client to have " + a1 + " acl. Current acls " +
+              "found:" + acls + ". Type:" + accessType +
+              ", name:" + (accessType == USER ? user : group), a1AclFound);
+          assertEquals("Current acls " + acls + ". Expect acl:" + a2 +
+                  " to be set? " + expectedAclResult + " accessType:"
+                  + accessType, expectedAclResult,
+              nativeAuthorizer.checkAccess(obj,
+                  builder.setAclRights(a2).build()));
+          aclsToBeValidated.remove(a2);
+          for (ACLType a3 : aclsToBeValidated) {
+            if (!a3.equals(a1) && !a3.equals(a2)) {
+              assertFalse("User shouldn't have right " + a3 + ". " +
+                      "Current acl rights for user:" + a1 + "," + a2,
+                  nativeAuthorizer.checkAccess(obj,
+                      builder.setAclRights(a3).build()));
+            }
+          }
+        }
+      }
+    }
+
+  }
+
+  private String getAclName(ACLIdentityType identityType) {
+    switch (identityType) {
+    case USER:
+      return ugi.getUserName();
+    case GROUP:
+      if (ugi.getGroups().size() > 0) {
+        return ugi.getGroups().get(0);
+      }
+    default:
+      return "";
+    }
+  }
+
+  /**
+   * Helper function to test acl rights with user/group had ALL acl bit set.
+   * @param obj
+   * @param builder
+   */
+  private void validateAll(OzoneObj obj, RequestContext.Builder
+      builder) throws OMException {
+    List<ACLType> allAcls = new ArrayList<>(Arrays.asList(ACLType.values()));
+    allAcls.remove(ALL);
+    allAcls.remove(NONE);
+    for (ACLType a : allAcls) {
+      assertEquals("User should have right " + a + ".", 
+          nativeAuthorizer.checkAccess(obj,
+          builder.setAclRights(a).build()), expectedAclResult);
+    }
+  }
+
+  /**
+   * Helper function to test acl rights with user/group had NONE acl bit set.
+   * @param obj
+   * @param builder
+   */
+  private void validateNone(OzoneObj obj, RequestContext.Builder
+      builder) throws OMException {
+    List<ACLType> allAcls = new ArrayList<>(Arrays.asList(ACLType.values()));
+    allAcls.remove(NONE);
+    for (ACLType a : allAcls) {
+      assertFalse("User shouldn't have right " + a + ".", 
+          nativeAuthorizer.checkAccess(obj, builder.setAclRights(a).build()));
+    }
+  }
+}

+ 3 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestVolume.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.client.rest.RestClient;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 
@@ -232,7 +233,8 @@ public class TestVolume {
       clientProtocol.createVolume(volumeName);
     }
 
-    List<OzoneVolume> ovols = clientProtocol.listVolumes(null, null, 100);
+    List<OzoneVolume> ovols = clientProtocol.listVolumes(
+        UserGroupInformation.getCurrentUser().getUserName(), null, null, 100);
     assertTrue(ovols.size() >= 10);
   }
 

+ 46 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java

@@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 
@@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 /**
@@ -623,4 +626,47 @@ public class BucketManagerImpl implements BucketManager {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
     }
   }
+
+  /**
+   * Check access for given ozoneObject.
+   *
+   * @param ozObject object for which access needs to be checked.
+   * @param context Context object encapsulating all user related information.
+   * @return true if user has access else false.
+   */
+  @Override
+  public boolean checkAccess(OzoneObj ozObject, RequestContext context)
+      throws OMException {
+    Objects.requireNonNull(ozObject);
+    Objects.requireNonNull(context);
+
+    String volume = ozObject.getVolumeName();
+    String bucket = ozObject.getBucketName();
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    try {
+      String dbBucketKey = metadataManager.getBucketKey(volume, bucket);
+      OmBucketInfo bucketInfo =
+          metadataManager.getBucketTable().get(dbBucketKey);
+      if (bucketInfo == null) {
+        LOG.debug("Bucket:{}/{} does not exist", volume, bucket);
+        throw new OMException("Bucket " + bucket + " is not found",
+            BUCKET_NOT_FOUND);
+      }
+      boolean hasAccess = OzoneUtils.checkAclRights(bucketInfo.getAcls(),
+          context);
+      LOG.debug("user:{} has access rights for bucket:{} :{} ",
+          context.getClientUgi(), ozObject.getBucketName(), hasAccess);
+      return hasAccess;
+    } catch (IOException ex) {
+      if(ex instanceof OMException) {
+        throw (OMException) ex;
+      }
+      LOG.error("CheckAccess operation failed for bucket:{}/{} acl:{}",
+          volume, bucket, ex);
+      throw new OMException("Check access operation failed for " +
+          "bucket:" + bucket, ex, INTERNAL_ERROR);
+    } finally {
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket); 
+    }
+  }
 }

+ 13 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/IOzoneAcl.java

@@ -17,7 +17,9 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
 
 import java.io.IOException;
 import java.util.List;
@@ -64,4 +66,15 @@ public interface IOzoneAcl {
    * @throws IOException if there is error.
    * */
   List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+
+  /**
+   * Check access for given ozoneObject.
+   *
+   * @param ozObject object for which access needs to be checked.
+   * @param context Context object encapsulating all user related information.
+   * @throws org.apache.hadoop.ozone.om.exceptions.OMException
+   * @return true if user has access else false.
+   */
+  boolean checkAccess(OzoneObj ozObject, RequestContext context)
+      throws OMException;
 }

+ 1 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java

@@ -43,7 +43,7 @@ import java.util.List;
 /**
  * Handles key level commands.
  */
-public interface KeyManager extends OzoneManagerFS {
+public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
 
   /**
    * Start key manager.

+ 76 - 22
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -35,11 +36,11 @@ import java.security.PrivilegedExceptionAction;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -78,10 +79,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -101,6 +102,7 @@ import org.apache.hadoop.utils.db.Table;
 
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
@@ -116,6 +118,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAU
 import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KMS_PROVIDER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
@@ -1398,15 +1404,13 @@ public class KeyManagerImpl implements KeyManager {
       OzoneAclInfo newAcl = null;
       for(OzoneAclInfo a: keyInfo.getAcls()) {
         if(a.getName().equals(acl.getName())) {
-          List<OzoneAclRights> rights =
-              new ArrayList<>(a.getRightsList());
-          for (IAccessAuthorizer.ACLType aclType : acl.getAclList()) {
-            rights.add(OzoneAclRights.valueOf(aclType.name()));
-          }
+          BitSet currentAcls = BitSet.valueOf(a.getRights().toByteArray());
+          currentAcls.or(acl.getAclBitSet());
+
           newAcl = OzoneAclInfo.newBuilder()
               .setType(a.getType())
               .setName(a.getName())
-              .addAllRights(rights)
+              .setRights(ByteString.copyFrom(currentAcls.toByteArray()))
               .build();
           newAcls.remove(a);
           newAcls.add(newAcl);
@@ -1484,15 +1488,13 @@ public class KeyManagerImpl implements KeyManager {
         // Acl to be removed might be a subset of existing acls.
         for(OzoneAclInfo a: keyInfo.getAcls()) {
           if(a.getName().equals(acl.getName())) {
-            List<OzoneAclRights> rights =
-                new ArrayList<>(a.getRightsList());
-            for (IAccessAuthorizer.ACLType aclType : acl.getAclList()) {
-              rights.remove(OzoneAclRights.valueOf(aclType.name()));
-            }
+            BitSet currentAcls = BitSet.valueOf(a.getRights().toByteArray());
+            acl.getAclBitSet().xor(currentAcls);
+            currentAcls.and(acl.getAclBitSet());
             newAcl = OzoneAclInfo.newBuilder()
                 .setType(a.getType())
                 .setName(a.getName())
-                .addAllRights(rights)
+                .setRights(ByteString.copyFrom(currentAcls.toByteArray()))
                 .build();
             newAcls.remove(a);
             newAcls.add(newAcl);
@@ -1636,6 +1638,58 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
+  /**
+   * Check access for given ozoneObject.
+   *
+   * @param ozObject object for which access needs to be checked.
+   * @param context Context object encapsulating all user related information.
+   * @return true if user has access else false.
+   */
+  @Override
+  public boolean checkAccess(OzoneObj ozObject, RequestContext context)
+      throws OMException {
+    Objects.requireNonNull(ozObject);
+    Objects.requireNonNull(context);
+    Objects.requireNonNull(context.getClientUgi());
+
+    String volume = ozObject.getVolumeName();
+    String bucket = ozObject.getBucketName();
+    String keyName = ozObject.getKeyName();
+
+    metadataManager.getLock().acquireLock(BUCKET_LOCK, volume, bucket);
+    try {
+      validateBucket(volume, bucket);
+      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
+      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
+      if (keyInfo == null) {
+        objectKey = OzoneFSUtils.addTrailingSlashIfNeeded(objectKey);
+        keyInfo = metadataManager.getKeyTable().get(objectKey);
+        
+        if(keyInfo == null) {
+          keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
+          if (keyInfo == null) {
+            throw new OMException("Key not found, checkAccess failed. Key:" +
+                objectKey, KEY_NOT_FOUND);
+          }
+        }
+      }
+      boolean hasAccess = OzoneUtils.checkAclRight(keyInfo.getAcls(), context);
+      LOG.debug("user:{} has access rights for key:{} :{} ",
+          context.getClientUgi(), ozObject.getKeyName(), hasAccess);
+      return hasAccess;
+    } catch (IOException ex) {
+      if(ex instanceof OMException) {
+        throw (OMException) ex;
+      }
+      LOG.error("CheckAccess operation failed for key:{}/{}/{}", volume,
+          bucket, keyName, ex);
+      throw new OMException("Check access operation failed for " +
+          "key:" + keyName, ex, INTERNAL_ERROR);
+    } finally {
+      metadataManager.getLock().releaseLock(BUCKET_LOCK, volume, bucket);
+    }
+  }
+
   /**
    * Helper method to validate ozone object.
    * @param obj
@@ -1713,7 +1767,7 @@ public class KeyManagerImpl implements KeyManager {
           "such file exists:");
       throw new OMException("Unable to get file status: volume: " +
           volumeName + " bucket: " + bucketName + " key: " + keyName,
-          ResultCodes.FILE_NOT_FOUND);
+          FILE_NOT_FOUND);
     } finally {
       metadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
           bucketName);
@@ -1826,7 +1880,7 @@ public class KeyManagerImpl implements KeyManager {
           }
         }
       } catch (OMException ex) {
-        if (ex.getResult() != ResultCodes.FILE_NOT_FOUND) {
+        if (ex.getResult() != FILE_NOT_FOUND) {
           throw ex;
         }
       }
@@ -2016,12 +2070,12 @@ public class KeyManagerImpl implements KeyManager {
           return fileStatus;
         }
       } catch (OMException ex) {
-        if (ex.getResult() != ResultCodes.FILE_NOT_FOUND) {
+        if (ex.getResult() != FILE_NOT_FOUND) {
           throw ex;
-        } else if (ex.getResult() == ResultCodes.FILE_NOT_FOUND) {
+        } else if (ex.getResult() == FILE_NOT_FOUND) {
           if (directoryMustExist) {
             throw new OMException("Parent directory does not exist",
-                ex.getCause(), ResultCodes.DIRECTORY_NOT_FOUND);
+                ex.getCause(), DIRECTORY_NOT_FOUND);
           }
         }
       }
@@ -2037,8 +2091,8 @@ public class KeyManagerImpl implements KeyManager {
     if (ezInfo != null) {
       if (getKMSProvider() == null) {
         throw new OMException("Invalid KMS provider, check configuration " +
-            CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
-            OMException.ResultCodes.INVALID_KMS_PROVIDER);
+            HADOOP_SECURITY_KEY_PROVIDER_PATH,
+            INVALID_KMS_PROVIDER);
       }
 
       final String ezKeyName = ezInfo.getKeyName();

+ 105 - 36
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -32,6 +32,7 @@ import java.security.cert.CertificateException;
 import java.util.Collection;
 import java.util.Objects;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -72,6 +73,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
@@ -102,7 +104,6 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.OMAction;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -130,6 +131,7 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
@@ -209,6 +211,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
@@ -277,6 +281,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private File omRatisSnapshotDir;
   private final File ratisSnapshotFile;
   private long snapshotIndex;
+  private final Collection<String> ozAdmins;
 
   private KeyProviderCryptoExtension kmsProvider = null;
   private static String keyProviderUriKeyName =
@@ -341,7 +346,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
     startRatisServer();
     startRatisClient();
-
     if (isRatisEnabled) {
       // Create Ratis storage dir
       String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration);
@@ -350,7 +354,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
             " must be defined.");
       }
       OmUtils.createOMDir(omRatisDirectory);
-
       // Create Ratis snapshot dir
       omRatisSnapshotDir = OmUtils.createOMDir(
           OmUtils.getOMRatisSnapshotDirectory(configuration));
@@ -367,9 +370,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
     omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
-
     secConfig = new SecurityConfig(configuration);
-
     volumeManager = new VolumeManagerImpl(metadataManager, configuration);
 
     // Create the KMS Key Provider
@@ -407,9 +408,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     this.scmClient = new ScmClient(scmBlockClient, scmContainerClient);
     keyManager = new KeyManagerImpl(scmClient, metadataManager,
         configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
-
     prefixManager = new PrefixManagerImpl(metadataManager);
-
     shutdownHook = () -> {
       saveOmMetrics();
     };
@@ -419,9 +418,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
             OZONE_ACL_ENABLED_DEFAULT);
     if (isAclEnabled) {
       accessAuthorizer = getACLAuthorizerInstance(conf);
+      if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
+        OzoneNativeAuthorizer authorizer =
+            (OzoneNativeAuthorizer) accessAuthorizer;
+        authorizer.setVolumeManager(volumeManager);
+        authorizer.setBucketManager(bucketManager);
+        authorizer.setKeyManager(keyManager);
+        authorizer.setPrefixManager(prefixManager);
+      }
     } else {
       accessAuthorizer = null;
     }
+    ozAdmins = conf.getTrimmedStringCollection(OzoneConfigKeys
+        .OZONE_ADMINISTRATORS);
     omMetaDir = OmUtils.getOmDbDir(configuration);
 
     this.scmBlockSize = (long) conf
@@ -1676,8 +1685,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public void createVolume(OmVolumeArgs args) throws IOException {
     try {
       if(isAclEnabled) {
-        checkAcls(ResourceType.VOLUME, StoreType.OZONE,
-            ACLType.CREATE, args.getVolume(), null, null);
+        if (!ozAdmins.contains(OZONE_ADMINISTRATORS_WILDCARD) && 
+            !ozAdmins.contains(ProtobufRpcEngine.Server.getRemoteUser()
+                .getUserName())) {
+          LOG.error("Only admin users are authorized to create " +
+              "Ozone volumes.");
+          throw new OMException("Only admin users are authorized to create " +
+              "Ozone volumes.", ResultCodes.PERMISSION_DENIED);
+        }
       }
       metrics.incNumVolumeCreates();
       volumeManager.createVolume(args);
@@ -2003,8 +2018,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
       String prevKey, int maxKeys) throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST, prefix,
-          null, null);
+      UserGroupInformation remoteUserUgi = ProtobufRpcEngine.Server.
+          getRemoteUser();
+      if (remoteUserUgi == null) {
+        LOG.error("Rpc user UGI is null. Authorization failed.");
+        throw new OMException("Rpc user UGI is null. Authorization " +
+            "failed.", ResultCodes.PERMISSION_DENIED);
+      }
     }
     boolean auditSuccess = true;
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -2043,8 +2063,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OmVolumeArgs> listAllVolumes(String prefix, String prevKey, int
       maxKeys) throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST, prefix,
-          null, null);
+      if (!ozAdmins.contains(ProtobufRpcEngine.Server.
+          getRemoteUser().getUserName())
+          && !ozAdmins.contains(OZONE_ADMINISTRATORS_WILDCARD)) {
+        LOG.error("Only admin users are authorized to create " +
+            "Ozone volumes.");
+        throw new OMException("Only admin users are authorized to create " +
+            "Ozone volumes.", ResultCodes.PERMISSION_DENIED);
+      }
     }
     boolean auditSuccess = true;
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -2079,7 +2105,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public void createBucket(OmBucketInfo bucketInfo) throws IOException {
     try {
       if(isAclEnabled) {
-        checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE,
+        checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.CREATE,
             bucketInfo.getVolumeName(), bucketInfo.getBucketName(), null);
       }
       metrics.incNumBucketCreates();
@@ -2175,8 +2201,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OpenKeySession openKey(OmKeyArgs args) throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      try {
+        checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      } catch (OMException ex) {
+        // For new keys key checkAccess call will fail as key doesn't exist.
+        // Check user access for bucket.
+        if (ex.getResult().equals(KEY_NOT_FOUND)) {
+          checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
+              args.getVolumeName(), args.getBucketName(), args.getKeyName());
+        } else {
+          throw ex;
+        }
+      }
     }
     boolean auditSuccess = true;
     try {
@@ -2245,8 +2282,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public void commitKey(OmKeyArgs args, long clientID)
       throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      try {
+        checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      } catch (OMException ex) {
+        // For new keys key checkAccess call will fail as key doesn't exist.
+        // Check user access for bucket.
+        if (ex.getResult().equals(KEY_NOT_FOUND)) {
+          checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
+              args.getVolumeName(), args.getBucketName(), args.getKeyName());
+        } else {
+          throw ex;
+        }
+      }
     }
     Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
         args.toAuditMap();
@@ -2276,11 +2324,21 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   @Override
   public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
-      ExcludeList excludeList)
-      throws IOException {
+      ExcludeList excludeList) throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      try {
+        checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      } catch (OMException ex) {
+        // For new keys key checkAccess call will fail as key doesn't exist.
+        // Check user access for bucket.
+        if (ex.getResult().equals(KEY_NOT_FOUND)) {
+          checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
+              args.getVolumeName(), args.getBucketName(), args.getKeyName());
+        } else {
+          throw ex;
+        }
+      }
     }
     boolean auditSuccess = true;
     Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
@@ -2414,8 +2472,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.LIST, volumeName,
-          bucketName, keyPrefix);
+      checkAcls(ResourceType.BUCKET,
+          StoreType.OZONE, ACLType.LIST, volumeName, bucketName, keyPrefix);
     }
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(volumeName);
@@ -2637,10 +2695,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     boolean acquiredS3Lock = false;
     boolean acquiredVolumeLock = false;
     try {
-      if(isAclEnabled) {
-        checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.CREATE,
-            null, s3BucketName, null);
-      }
       metrics.incNumBucketCreates();
       acquiredS3Lock = metadataManager.getLock().acquireLock(S3_BUCKET_LOCK,
           s3BucketName);
@@ -2684,8 +2738,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public void deleteS3Bucket(String s3BucketName) throws IOException {
     try {
       if(isAclEnabled) {
-        checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.DELETE, null,
-            s3BucketName, null);
+        checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.DELETE, 
+            getS3VolumeName(), s3BucketName, null);
       }
       metrics.incNumBucketDeletes();
       s3BucketManager.deleteS3Bucket(s3BucketName);
@@ -2711,11 +2765,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       throws IOException {
     if(isAclEnabled) {
       checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.READ,
-          null, s3BucketName, null);
+          getS3VolumeName(), s3BucketName, null);
     }
     return s3BucketManager.getOzoneBucketMapping(s3BucketName);
   }
 
+  /**
+   * Helper function to return volume name for S3 users.
+   * */
+  private String getS3VolumeName() {
+    return s3BucketManager.formatOzoneVolumeName(DigestUtils.md5Hex(
+        ProtobufRpcEngine.Server.getRemoteUser().getUserName().toLowerCase()));
+  }
+
   @Override
   public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
                                           String prefix, int maxNumOfBuckets)
@@ -2892,7 +2954,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
     if (isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+      checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
           args.getVolumeName(), args.getBucketName(), args.getKeyName());
     }
     boolean auditSuccess = true;
@@ -2915,10 +2977,17 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  private ResourceType getResourceType(OmKeyArgs args) {
+    if (args.getKeyName() == null || args.getKeyName().length() == 0) {
+      return ResourceType.BUCKET;
+    }
+    return ResourceType.KEY;
+  }
+
   @Override
   public void createDirectory(OmKeyArgs args) throws IOException {
     if (isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
           args.getVolumeName(), args.getBucketName(), args.getKeyName());
     }
     boolean auditSuccess = true;
@@ -2945,8 +3014,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public OpenKeySession createFile(OmKeyArgs args, boolean overWrite,
       boolean recursive) throws IOException {
     if (isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), null);
     }
     boolean auditSuccess = true;
     try {
@@ -2994,7 +3063,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
       String startKey, long numEntries) throws IOException {
     if(isAclEnabled) {
-      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+      checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
           args.getVolumeName(), args.getBucketName(), args.getKeyName());
     }
     boolean auditSuccess = true;

+ 39 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java

@@ -21,8 +21,10 @@ import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.ozone.util.RadixNode;
 import org.apache.hadoop.ozone.util.RadixTree;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.utils.db.*;
 import org.apache.hadoop.utils.db.Table.KeyValue;
 import org.slf4j.Logger;
@@ -273,6 +275,43 @@ public class PrefixManagerImpl implements PrefixManager {
     return EMPTY_ACL_LIST;
   }
 
+  /**
+   * Check access for given ozoneObject.
+   *
+   * @param ozObject object for which access needs to be checked.
+   * @param context Context object encapsulating all user related information.
+   * @return true if user has access else false.
+   */
+  @Override
+  public boolean checkAccess(OzoneObj ozObject, RequestContext context)
+      throws OMException {
+    Objects.requireNonNull(ozObject);
+    Objects.requireNonNull(context);
+
+    String prefixPath = ozObject.getPath();
+    metadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+    try {
+      String longestPrefix = prefixTree.getLongestPrefix(prefixPath);
+      if (prefixPath.equals(longestPrefix)) {
+        RadixNode<OmPrefixInfo> lastNode =
+            prefixTree.getLastNodeInPrefixPath(prefixPath);
+        if (lastNode != null && lastNode.getValue() != null) {
+          boolean hasAccess = OzoneUtils.checkAclRights(lastNode.getValue().
+              getAcls(), context);
+          LOG.debug("user:{} has access rights for ozObj:{} ::{} ",
+              context.getClientUgi(), ozObject, hasAccess);
+          return hasAccess;
+        } else {
+          return true;
+        }
+      } else {
+        return true;
+      }
+    } finally {
+      metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
+    }
+  }
+
   @Override
   public List<OmPrefixInfo> getLongestPrefixPath(String path) {
     String prefixPath = prefixTree.getLongestPrefix(path);

+ 21 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3BucketManagerImpl.java

@@ -21,10 +21,13 @@ package org.apache.hadoop.ozone.om;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
 
@@ -33,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Objects;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_VOLUME_PREFIX;
@@ -162,13 +166,17 @@ public class S3BucketManagerImpl implements S3BucketManager {
     boolean newVolumeCreate = true;
     String ozoneVolumeName = formatOzoneVolumeName(userName);
     try {
-      OmVolumeArgs args =
+      OmVolumeArgs.Builder builder =
           OmVolumeArgs.newBuilder()
               .setAdminName(S3_ADMIN_NAME)
               .setOwnerName(userName)
               .setVolume(ozoneVolumeName)
-              .setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES)
-              .build();
+              .setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES);
+      for (OzoneAcl acl : getDefaultAcls(userName)) {
+        builder.addOzoneAcls(OzoneAcl.toProtobuf(acl));
+      }
+
+      OmVolumeArgs args = builder.build();
       if (isRatisEnabled) {
         // When ratis is enabled we need to call apply also.
         volumeManager.applyCreateVolume(args, volumeManager.createVolume(args));
@@ -189,6 +197,15 @@ public class S3BucketManagerImpl implements S3BucketManager {
     return newVolumeCreate;
   }
 
+  /**
+   * Get default acls. 
+   * */
+  private List<OzoneAcl> getDefaultAcls(String userName) {
+    UserGroupInformation ugi = ProtobufRpcEngine.Server.getRemoteUser();
+    return OzoneAcl.parseAcls("user:" + (ugi == null ? userName :
+        ugi.getUserName()) + ":a,user:" + S3_ADMIN_NAME + ":a");
+  }
+
   private void createOzoneBucket(String volumeName, String bucketName)
       throws IOException {
     OmBucketInfo.Builder builder = OmBucketInfo.newBuilder();
@@ -198,6 +215,7 @@ public class S3BucketManagerImpl implements S3BucketManager {
             .setBucketName(bucketName)
             .setIsVersionEnabled(Boolean.FALSE)
             .setStorageType(StorageType.DEFAULT)
+            .setAcls(getDefaultAcls(null))
             .build();
     bucketManager.createBucket(bucketInfo);
   }

+ 62 - 4
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java

@@ -20,17 +20,22 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.utils.db.BatchOperation;
 
 import com.google.common.base.Preconditions;
@@ -53,6 +58,7 @@ public class VolumeManagerImpl implements VolumeManager {
   private final OMMetadataManager metadataManager;
   private final int maxUserVolumeCount;
   private final boolean isRatisEnabled;
+  private final boolean aclEnabled;
 
   /**
    * Constructor.
@@ -67,6 +73,8 @@ public class VolumeManagerImpl implements VolumeManager {
     isRatisEnabled = conf.getBoolean(
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+    aclEnabled = conf.getBoolean(OzoneConfigKeys.OZONE_ACL_ENABLED,
+        OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT);
   }
 
   // Helpers to add and delete volume from user list
@@ -504,8 +512,7 @@ public class VolumeManagerImpl implements VolumeManager {
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Check volume access failed for volume:{} user:{} rights:{}",
-            volume, userAcl.getName(),
-            StringUtils.join(userAcl.getRightsList(), ","), ex);
+            volume, userAcl.getName(), userAcl.getRights().toString(), ex);
       }
       throw ex;
     } finally {
@@ -521,8 +528,19 @@ public class VolumeManagerImpl implements VolumeManager {
       String prefix, String startKey, int maxKeys) throws IOException {
     metadataManager.getLock().acquireLock(USER_LOCK, userName);
     try {
-      return metadataManager.listVolumes(
+      List<OmVolumeArgs> volumes = metadataManager.listVolumes(
           userName, prefix, startKey, maxKeys);
+      UserGroupInformation userUgi = ProtobufRpcEngine.Server.
+          getRemoteUser();
+      if (userUgi == null || !aclEnabled) {
+        return volumes;
+      }
+
+      List<OmVolumeArgs> filteredVolumes = volumes.stream().
+          filter(v -> v.getAclMap().
+              hasAccess(IAccessAuthorizer.ACLType.LIST, userUgi))
+          .collect(Collectors.toList());
+      return filteredVolumes;
     } finally {
       metadataManager.getLock().releaseLock(USER_LOCK, userName);
     }
@@ -711,4 +729,44 @@ public class VolumeManagerImpl implements VolumeManager {
       metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
     }
   }
+
+  /**
+   * Check access for given ozoneObject.
+   *
+   * @param ozObject object for which access needs to be checked.
+   * @param context Context object encapsulating all user related information.
+   * @return true if user has access else false.
+   */
+  @Override
+  public boolean checkAccess(OzoneObj ozObject, RequestContext context)
+      throws OMException {
+    Objects.requireNonNull(ozObject);
+    Objects.requireNonNull(context);
+
+    String volume = ozObject.getVolumeName();
+    metadataManager.getLock().acquireLock(VOLUME_LOCK, volume);
+    try {
+      String dbVolumeKey = metadataManager.getVolumeKey(volume);
+      OmVolumeArgs volumeArgs =
+          metadataManager.getVolumeTable().get(dbVolumeKey);
+      if (volumeArgs == null) {
+        LOG.debug("volume:{} does not exist", volume);
+        throw new OMException("Volume " + volume + " is not found",
+            ResultCodes.VOLUME_NOT_FOUND);
+      }
+
+      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
+      boolean hasAccess = volumeArgs.getAclMap().hasAccess(
+          context.getAclRights(), context.getClientUgi());
+      LOG.debug("user:{} has access rights for volume:{} :{} ",
+          context.getClientUgi(), ozObject.getVolumeName(), hasAccess);
+      return hasAccess;
+    } catch (IOException ex) {
+      LOG.error("Check access operation failed for volume:{}", volume, ex);
+      throw new OMException("Check access operation failed for " +
+          "volume:" + volume, ex, ResultCodes.INTERNAL_ERROR);
+    } finally {
+      metadataManager.getLock().releaseLock(VOLUME_LOCK, volume);
+    }
+  }
 }

+ 2 - 4
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

@@ -410,11 +410,9 @@ public class OzoneManagerRequestHandler implements RequestHandler {
   }
 
   private SetAclResponse setAcl(SetAclRequest req) throws IOException {
-    List<OzoneAcl> ozoneAcl = new ArrayList<>();
-    req.getAclList().forEach(a ->
-        ozoneAcl.add(OzoneAcl.fromProtobuf(a)));
     boolean response = impl.setAcl(OzoneObjInfo.fromProtobuf(req.getObj()),
-        ozoneAcl);
+        req.getAclList().stream().map(a -> OzoneAcl.fromProtobuf(a)).
+            collect(Collectors.toList()));
     return SetAclResponse.newBuilder().setResponse(response).build();
   }
 

+ 120 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneNativeAuthorizer.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security.acl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ozone.om.BucketManager;
+import org.apache.hadoop.ozone.om.KeyManager;
+import org.apache.hadoop.ozone.om.PrefixManager;
+import org.apache.hadoop.ozone.om.VolumeManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+
+/**
+ * Public API for Ozone ACLs. Security providers providing support for Ozone
+ * ACLs should implement this.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "Yarn", "Ranger", "Hive", "HBase"})
+@InterfaceStability.Evolving
+public class OzoneNativeAuthorizer implements IAccessAuthorizer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneNativeAuthorizer.class);
+  private VolumeManager volumeManager;
+  private BucketManager bucketManager;
+  private KeyManager keyManager;
+  private PrefixManager prefixManager;
+
+  public OzoneNativeAuthorizer() {
+  }
+
+  public OzoneNativeAuthorizer(VolumeManager volumeManager,
+      BucketManager bucketManager, KeyManager keyManager,
+      PrefixManager prefixManager) {
+    this.volumeManager = volumeManager;
+    this.bucketManager = bucketManager;
+    this.keyManager = keyManager;
+    this.prefixManager = prefixManager;
+  }
+
+  /**
+   * Check access for given ozoneObject.
+   *
+   * @param ozObject object for which access needs to be checked.
+   * @param context Context object encapsulating all user related information.
+   * @return true if user has access else false.
+   */
+  public boolean checkAccess(IOzoneObj ozObject, RequestContext context)
+      throws OMException {
+    Objects.requireNonNull(ozObject);
+    Objects.requireNonNull(context);
+    OzoneObjInfo objInfo;
+
+    if (ozObject instanceof OzoneObjInfo) {
+      objInfo = (OzoneObjInfo) ozObject;
+    } else {
+      throw new OMException("Unexpected input received. OM native acls are " +
+          "configured to work with OzoneObjInfo type only.", INVALID_REQUEST);
+    }
+
+    switch (objInfo.getResourceType()) {
+    case VOLUME:
+      LOG.trace("Checking access for volume:" + objInfo);
+      return volumeManager.checkAccess(objInfo, context);
+    case BUCKET:
+      LOG.trace("Checking access for bucket:" + objInfo);
+      return (bucketManager.checkAccess(objInfo, context)
+          && volumeManager.checkAccess(objInfo, context));
+    case KEY:
+      LOG.trace("Checking access for Key:" + objInfo);
+      return (keyManager.checkAccess(objInfo, context)
+          && prefixManager.checkAccess(objInfo, context)
+          && bucketManager.checkAccess(objInfo, context)
+          && volumeManager.checkAccess(objInfo, context));
+    case PREFIX:
+      LOG.trace("Checking access for Prefix:" + objInfo);
+      return (prefixManager.checkAccess(objInfo, context)
+          && bucketManager.checkAccess(objInfo, context)
+          && volumeManager.checkAccess(objInfo, context));
+    default:
+      throw new OMException("Unexpected object type:" +
+          objInfo.getResourceType(), INVALID_REQUEST);
+    }
+  }
+
+  public void setVolumeManager(VolumeManager volumeManager) {
+    this.volumeManager = volumeManager;
+  }
+
+  public void setBucketManager(BucketManager bucketManager) {
+    this.bucketManager = bucketManager;
+  }
+
+  public void setKeyManager(KeyManager keyManager) {
+    this.keyManager = keyManager;
+  }
+
+  public void setPrefixManager(PrefixManager prefixManager) {
+    this.prefixManager = prefixManager;
+  }
+}

+ 22 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/acl/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.security.acl;
+
+/**
+ * OM native acl implementation.
+ */

+ 1 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/volume/ListVolumeHandler.java

@@ -77,7 +77,7 @@ public class ListVolumeHandler extends Handler {
     OzoneClient client = address.createClient(createOzoneConfiguration());
 
     if (userName == null) {
-      userName = UserGroupInformation.getCurrentUser().getShortUserName();
+      userName = UserGroupInformation.getCurrentUser().getUserName();
     }
 
     if (maxVolumes < 1) {

+ 15 - 4
hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java

@@ -43,6 +43,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
@@ -589,7 +593,7 @@ public class BasicOzoneFileSystem extends FileSystem {
           throw new FileAlreadyExistsException(String.format(
               "Can't make directory for path '%s', it is a file.", fPart));
         }
-      } catch (FileNotFoundException fnfe) {
+      } catch (FileNotFoundException | OMException fnfe) {
         LOG.trace("creating directory for fpart:{}", fPart);
         String key = pathToKey(fPart);
         String dirKey = addTrailingSlashIfNeeded(key);
@@ -626,9 +630,16 @@ public class BasicOzoneFileSystem extends FileSystem {
     LOG.trace("getFileStatus() path:{}", f);
     Path qualifiedPath = f.makeQualified(uri, workingDir);
     String key = pathToKey(qualifiedPath);
-    FileStatus status = convertFileStatus(
-        adapter.getFileStatus(key, uri, qualifiedPath, getUsername()));
-    return status;
+    FileStatus fileStatus = null;
+    try {
+      fileStatus = adapter.getFileStatus(key)
+        .makeQualified(uri, qualifiedPath, getUsername(), getUsername());
+    } catch (OMException ex) {
+      if (ex.getResult().equals(OMException.ResultCodes.KEY_NOT_FOUND)) {
+        throw new FileNotFoundException("File not found. path:" + f);
+      }
+    }
+    return fileStatus;
   }
 
   /**

+ 1 - 1
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java

@@ -378,7 +378,7 @@ public class SQLCLI  extends Configured implements Tool {
       for (OzoneAclInfo aclInfo : volumeInfo.getVolumeAclsList()) {
         String insertAclInfo =
             String.format(INSERT_ACL_INFO, adminName, ownerName, volumeName,
-                aclInfo.getType(), aclInfo.getName(), aclInfo.getRightsList());
+                aclInfo.getType(), aclInfo.getName(), aclInfo.getRights());
         executeSQL(conn, insertAclInfo);
       }
       break;