Browse Source

HDDS-1539. Implement addAcl,removeAcl,setAcl,getAcl for Volume. Contributed Ajay Kumar. (#847)

Ajay Yadav 6 năm trước cách đây
mục cha
commit
12be6ff2ff
25 tập tin đã thay đổi với 1112 bổ sung144 xóa
  1. 51 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
  2. 41 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
  3. 55 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
  4. 51 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  5. 4 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
  6. 58 35
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
  7. 1 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
  8. 2 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
  9. 85 44
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
  10. 16 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
  11. 41 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
  12. 110 6
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  13. 9 11
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
  14. 10 5
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java
  15. 11 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java
  16. 31 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
  17. 64 1
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  18. 48 15
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java
  19. 42 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
  20. 3 3
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java
  21. 85 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  22. 41 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
  23. 177 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
  24. 74 19
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
  25. 2 1
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java

+ 51 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java

@@ -31,11 +31,13 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -444,4 +446,53 @@ public class ObjectStore {
     return proxy.getCanonicalServiceName();
   }
 
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @return true if acl is added successfully, else false.
+   * @throws IOException if there is error.
+   * */
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    return proxy.addAcl(obj, acl);
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @return true if acl is added successfully, else false.
+   * @throws IOException if there is error.
+   */
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    return proxy.removeAcl(obj, acl);
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @return true if acl is added successfully, else false.
+   * @throws IOException if there is error.
+   */
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    return proxy.setAcl(obj, acls);
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @return true if acl is added successfully, else false.
+   * @throws IOException if there is error.
+   */
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    return proxy.getAcl(obj);
+  }
+
 }

+ 41 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -43,6 +43,7 @@ import java.util.Map;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
 
@@ -621,4 +622,44 @@ public interface ClientProtocol {
   List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
       String keyName, boolean recursive, String startKey, long numEntries)
       throws IOException;
+
+
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for
+   * given object to list of ACLs provided in argument.
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException;
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   * @param obj Ozone object.
+   *
+   * @throws IOException if there is error.
+   * */
+  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+
 }

+ 55 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.web.response.ListBuckets;
 import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
@@ -1121,4 +1122,58 @@ public class RestClient implements ClientProtocol {
     throw new UnsupportedOperationException(
         "Ozone REST protocol does not " + "support this operation.");
   }
+
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not" +
+        " support this operation.");
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not" +
+        " support this operation.");
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not" +
+        " support this operation.");
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not" +
+        " support this operation.");
+  }
 }

+ 51 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -87,6 +87,7 @@ import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -1009,6 +1010,56 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .listStatus(keyArgs, recursive, startKey, numEntries);
   }
 
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    return ozoneManagerClient.addAcl(obj, acl);
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    return ozoneManagerClient.removeAcl(obj, acl);
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    return ozoneManagerClient.setAcl(obj, acls);
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    return ozoneManagerClient.getAcl(obj);
+  }
+
   private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
       String requestId) throws IOException {
     LengthInputStream lengthInputStream = KeyInputStream

+ 4 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java

@@ -190,6 +190,7 @@ public final class OmUtils {
     case GetFileStatus:
     case LookupFile:
     case ListStatus:
+    case GetAcl:
       return true;
     case CreateVolume:
     case SetVolumeProperty:
@@ -216,6 +217,9 @@ public final class OmUtils {
     case ApplyInitiateMultiPartUpload:
     case CreateDirectory:
     case CreateFile:
+    case RemoveAcl:
+    case SetAcl:
+    case AddAcl:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);

+ 58 - 35
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java

@@ -19,10 +19,15 @@
 
 package org.apache.hadoop.ozone;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Objects;
 
@@ -36,10 +41,11 @@ import java.util.Objects;
  * <li>world::rw
  * </ul>
  */
+@JsonIgnoreProperties(value = {"aclBitSet"})
 public class OzoneAcl {
   private ACLIdentityType type;
   private String name;
-  private List<ACLType> rights;
+  private BitSet aclBitSet;
 
   /**
    * Constructor for OzoneAcl.
@@ -56,8 +62,8 @@ public class OzoneAcl {
    */
   public OzoneAcl(ACLIdentityType type, String name, ACLType acl) {
     this.name = name;
-    this.rights = new ArrayList<>();
-    this.rights.add(acl);
+    this.aclBitSet = new BitSet(ACLType.getNoOfAcls());
+    aclBitSet.set(acl.ordinal(), true);
     this.type = type;
     if (type == ACLIdentityType.WORLD && name.length() != 0) {
       throw new IllegalArgumentException("Unexpected name part in world type");
@@ -75,9 +81,20 @@ public class OzoneAcl {
    * @param name - Name of user
    * @param acls - Rights
    */
-  public OzoneAcl(ACLIdentityType type, String name, List<ACLType> acls) {
+  public OzoneAcl(ACLIdentityType type, String name, BitSet acls) {
+    Objects.requireNonNull(type);
+    Objects.requireNonNull(acls);
+
+    if(acls.cardinality() > ACLType.getNoOfAcls()) {
+      throw new IllegalArgumentException("Acl bitset passed has unexpected " +
+          "size. bitset size:" + acls.cardinality() + ", bitset:"
+          + acls.toString());
+    }
+
+    this.aclBitSet = (BitSet) acls.clone();
+    acls.stream().forEach(a -> aclBitSet.set(a));
+
     this.name = name;
-    this.rights = acls;
     this.type = type;
     if (type == ACLIdentityType.WORLD && name.length() != 0) {
       throw new IllegalArgumentException("Unexpected name part in world type");
@@ -105,9 +122,10 @@ public class OzoneAcl {
     }
 
     ACLIdentityType aclType = ACLIdentityType.valueOf(parts[0].toUpperCase());
-    List<ACLType> acls = new ArrayList<>();
+    BitSet acls = new BitSet(ACLType.getNoOfAcls());
+
     for (char ch : parts[2].toCharArray()) {
-      acls.add(ACLType.getACLRight(String.valueOf(ch)));
+      acls.set(ACLType.getACLRight(String.valueOf(ch)).ordinal());
     }
 
     // TODO : Support sanitation of these user names by calling into
@@ -115,9 +133,27 @@ public class OzoneAcl {
     return new OzoneAcl(aclType, parts[1], acls);
   }
 
+  public static OzoneAclInfo toProtobuf(OzoneAcl acl) {
+    OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
+        .setName(acl.getName())
+        .setType(OzoneAclType.valueOf(acl.getType().name()));
+    acl.getAclBitSet().stream().forEach(a ->
+        builder.addRights(OzoneAclRights.valueOf(ACLType.values()[a].name())));
+    return builder.build();
+  }
+
+  public static OzoneAcl fromProtobuf(OzoneAclInfo protoAcl) {
+    BitSet aclRights = new BitSet(ACLType.getNoOfAcls());
+    protoAcl.getRightsList().parallelStream().forEach(a ->
+        aclRights.set(a.ordinal()));
+
+    return new OzoneAcl(ACLIdentityType.valueOf(protoAcl.getType().name()),
+        protoAcl.getName(), aclRights);
+  }
+
   @Override
   public String toString() {
-    return type + ":" + name + ":" + ACLType.getACLString(rights);
+    return type + ":" + name + ":" + ACLType.getACLString(aclBitSet);
   }
 
   /**
@@ -131,7 +167,7 @@ public class OzoneAcl {
    */
   @Override
   public int hashCode() {
-    return Objects.hash(this.getName(), this.getRights().toString(),
+    return Objects.hash(this.getName(), this.getAclBitSet(),
                         this.getType().toString());
   }
 
@@ -149,8 +185,16 @@ public class OzoneAcl {
    *
    * @return - Rights
    */
-  public List<ACLType> getRights() {
-    return rights;
+  public BitSet getAclBitSet() {
+    return aclBitSet;
+  }
+
+  public List<ACLType> getAclList() {
+    List<ACLType> acls = new ArrayList<>(ACLType.getNoOfAcls());
+    if(aclBitSet !=  null) {
+      aclBitSet.stream().forEach(a -> acls.add(ACLType.values()[a]));
+    }
+    return acls;
   }
 
   /**
@@ -179,29 +223,8 @@ public class OzoneAcl {
       return false;
     }
     OzoneAcl otherAcl = (OzoneAcl) obj;
-    return otherAcl.toString().equals(this.toString());
-  }
-
-  /**
-   * ACL types.
-   */
-  public enum OzoneACLType {
-    USER(OzoneConsts.OZONE_ACL_USER_TYPE),
-    GROUP(OzoneConsts.OZONE_ACL_GROUP_TYPE),
-    WORLD(OzoneConsts.OZONE_ACL_WORLD_TYPE);
-
-    /**
-     * String value for this Enum.
-     */
-    private final String value;
-
-    /**
-     * Init OzoneACLtypes enum.
-     *
-     * @param val String type for this enum.
-     */
-    OzoneACLType(String val) {
-      value = val;
-    }
+    return otherAcl.getName().equals(this.getName()) &&
+        otherAcl.getType().equals(this.getType()) &&
+        otherAcl.getAclBitSet().equals(this.getAclBitSet());
   }
 }

+ 1 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java

@@ -195,6 +195,6 @@ public class OMException extends IOException {
 
     FILE_ALREADY_EXISTS,
 
-    NOT_A_FILE
+    NOT_A_FILE,
   }
 }

+ 2 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java

@@ -274,8 +274,8 @@ public final class OmBucketInfo extends WithMetadata implements Auditable {
     BucketInfo.Builder bib =  BucketInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .addAllAcls(acls.stream().map(
-            OMPBHelper::convertOzoneAcl).collect(Collectors.toList()))
+        .addAllAcls(acls.stream().map(OMPBHelper::convertOzoneAcl)
+            .collect(Collectors.toList()))
         .setIsVersionEnabled(isVersionEnabled)
         .setStorageType(storageType.toProto())
         .setCreationTime(creationTime)

+ 85 - 44
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java

@@ -18,18 +18,26 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 
+import java.util.BitSet;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Objects;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights.ALL;
 
 /**
  * This helper class keeps a map of all user and their permissions.
@@ -37,7 +45,7 @@ import java.util.HashMap;
 @SuppressWarnings("ProtocolBufferOrdinal")
 public class OmOzoneAclMap {
   // per Acl Type user:rights map
-  private ArrayList<Map<String, List<OzoneAclRights>>> aclMaps;
+  private ArrayList<Map<String, BitSet>> aclMaps;
 
   OmOzoneAclMap() {
     aclMaps = new ArrayList<>();
@@ -46,18 +54,77 @@ public class OmOzoneAclMap {
     }
   }
 
-  private Map<String, List<OzoneAclRights>> getMap(OzoneAclType type) {
+  private Map<String, BitSet> getMap(OzoneAclType type) {
     return aclMaps.get(type.ordinal());
   }
 
   // For a given acl type and user, get the stored acl
-  private List<OzoneAclRights> getAcl(OzoneAclType type, String user) {
+  private BitSet getAcl(OzoneAclType type, String user) {
     return getMap(type).get(user);
   }
 
+  public List<OzoneAcl> getAcl() {
+    List<OzoneAcl> acls = new ArrayList<>();
+
+    for (OzoneAclType type : OzoneAclType.values()) {
+      aclMaps.get(type.ordinal()).entrySet().stream().
+          forEach(entry -> acls.add(new OzoneAcl(ACLIdentityType.
+              valueOf(type.name()), entry.getKey(), entry.getValue())));
+    }
+    return acls;
+  }
+
   // Add a new acl to the map
-  public void addAcl(OzoneAclInfo acl) {
-    getMap(acl.getType()).put(acl.getName(), acl.getRightsList());
+  public void addAcl(OzoneAcl acl) throws OMException {
+    Objects.requireNonNull(acl, "Acl should not be null.");
+    OzoneAclType aclType = OzoneAclType.valueOf(acl.getType().name());
+    if (!getMap(aclType).containsKey(acl.getName())) {
+      getMap(aclType).put(acl.getName(), acl.getAclBitSet());
+    } else {
+      // throw exception if acl is already added.
+      throw new OMException("Acl " + acl + " already exist.", INVALID_REQUEST);
+    }
+  }
+
+  // Add a new acl to the map
+  public void setAcls(List<OzoneAcl> acls) throws OMException {
+    Objects.requireNonNull(acls, "Acls should not be null.");
+    // Remove all Acls.
+    for (OzoneAclType type : OzoneAclType.values()) {
+      aclMaps.get(type.ordinal()).clear();
+    }
+
+    // Add acls.
+    for (OzoneAcl acl : acls) {
+      addAcl(acl);
+    }
+  }
+
+  // Add a new acl to the map
+  public void removeAcl(OzoneAcl acl) throws OMException {
+    Objects.requireNonNull(acl, "Acl should not be null.");
+    OzoneAclType aclType = OzoneAclType.valueOf(acl.getType().name());
+    if (getMap(aclType).containsKey(acl.getName())) {
+      getMap(aclType).remove(acl.getName());
+    } else {
+      // throw exception if acl is already added.
+      throw new OMException("Acl [" + acl + "] doesn't exist.",
+          INVALID_REQUEST);
+    }
+  }
+
+  // Add a new acl to the map
+  public void addAcl(OzoneAclInfo acl) throws OMException {
+    Objects.requireNonNull(acl, "Acl should not be null.");
+    if (!getMap(acl.getType()).containsKey(acl.getName())) {
+      BitSet acls = new BitSet(OzoneAclRights.values().length);
+      acl.getRightsList().parallelStream().forEach(a -> acls.set(a.ordinal()));
+      getMap(acl.getType()).put(acl.getName(), acls);
+    } else {
+      // throw exception if acl is already added.
+
+      throw new OMException("Acl " + acl + " already exist.", INVALID_REQUEST);
+    }
   }
 
   // for a given acl, check if the user has access rights
@@ -66,40 +133,14 @@ public class OmOzoneAclMap {
       return false;
     }
 
-    List<OzoneAclRights> storedRights = getAcl(acl.getType(), acl.getName());
-    if(storedRights == null) {
+    BitSet aclBitSet = getAcl(acl.getType(), acl.getName());
+    if (aclBitSet == null) {
       return false;
     }
 
-    for (OzoneAclRights right : storedRights) {
-      switch (right) {
-      case CREATE:
-        return (right == OzoneAclRights.CREATE)
-            || (right == OzoneAclRights.ALL);
-      case LIST:
-        return (right == OzoneAclRights.LIST)
-            || (right == OzoneAclRights.ALL);
-      case WRITE:
-        return (right == OzoneAclRights.WRITE)
-            || (right == OzoneAclRights.ALL);
-      case READ:
-        return (right == OzoneAclRights.READ)
-            || (right == OzoneAclRights.ALL);
-      case DELETE:
-        return (right == OzoneAclRights.DELETE)
-            || (right == OzoneAclRights.ALL);
-      case READ_ACL:
-        return (right == OzoneAclRights.READ_ACL)
-            || (right == OzoneAclRights.ALL);
-      case WRITE_ACL:
-        return (right == OzoneAclRights.WRITE_ACL)
-            || (right == OzoneAclRights.ALL);
-      case ALL:
-        return (right == OzoneAclRights.ALL);
-      case NONE:
-        return !(right == OzoneAclRights.NONE);
-      default:
-        return false;
+    for (OzoneAclRights right : acl.getRightsList()) {
+      if (aclBitSet.get(right.ordinal()) || aclBitSet.get(ALL.ordinal())) {
+        return true;
       }
     }
     return false;
@@ -108,15 +149,15 @@ public class OmOzoneAclMap {
   // Convert this map to OzoneAclInfo Protobuf List
   public List<OzoneAclInfo> ozoneAclGetProtobuf() {
     List<OzoneAclInfo> aclList = new LinkedList<>();
-    for (OzoneAclType type: OzoneAclType.values()) {
-      for (Map.Entry<String, List<OzoneAclRights>> entry :
+    for (OzoneAclType type : OzoneAclType.values()) {
+      for (Map.Entry<String, BitSet> entry :
           aclMaps.get(type.ordinal()).entrySet()) {
-        OzoneAclInfo aclInfo = OzoneAclInfo.newBuilder()
+        OzoneAclInfo.Builder builder = OzoneAclInfo.newBuilder()
             .setName(entry.getKey())
-            .setType(type)
-            .addAllRights(entry.getValue())
-            .build();
-        aclList.add(aclInfo);
+            .setType(type);
+        entry.getValue().stream().forEach(a ->
+            builder.addRights(OzoneAclRights.values()[a]));
+        aclList.add(builder.build());
       }
     }
 
@@ -125,7 +166,7 @@ public class OmOzoneAclMap {
 
   // Create map from list of OzoneAclInfos
   public static OmOzoneAclMap ozoneAclGetFromProtobuf(
-      List<OzoneAclInfo> aclList) {
+      List<OzoneAclInfo> aclList) throws OMException {
     OmOzoneAclMap aclMap = new OmOzoneAclMap();
     for (OzoneAclInfo acl : aclList) {
       aclMap.addAcl(acl);

+ 16 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java

@@ -23,8 +23,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.Auditable;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 
@@ -77,6 +79,18 @@ public final class OmVolumeArgs extends WithMetadata implements Auditable {
     this.creationTime = time;
   }
 
+  public void addAcl(OzoneAcl acl) throws OMException {
+    this.aclMap.addAcl(acl);
+  }
+
+  public void setAcls(List<OzoneAcl> acls) throws OMException {
+    this.aclMap.setAcls(acls);
+  }
+
+  public void removeAcl(OzoneAcl acl) throws OMException {
+    this.aclMap.removeAcl(acl);
+  }
+
   /**
    * Returns the Admin Name.
    * @return String.
@@ -232,7 +246,8 @@ public final class OmVolumeArgs extends WithMetadata implements Auditable {
         .build();
   }
 
-  public static OmVolumeArgs getFromProtobuf(VolumeInfo volInfo) {
+  public static OmVolumeArgs getFromProtobuf(VolumeInfo volInfo)
+      throws OMException {
 
     OmOzoneAclMap aclMap =
         OmOzoneAclMap.ozoneAclGetFromProtobuf(volInfo.getVolumeAclsList());

+ 41 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.protocol;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -45,6 +46,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -464,5 +466,44 @@ public interface OzoneManagerProtocol
    */
   List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
       String startKey, long numEntries) throws IOException;
+
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for
+   * given object to list of ACLs provided in argument.
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException;
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   * @param obj Ozone object.
+   *
+   * @throws IOException if there is error.
+   * */
+  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+
 }
 

+ 110 - 6
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -55,6 +56,9 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileResponse;
@@ -62,6 +66,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
@@ -107,6 +113,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Multipa
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenewDelegationTokenResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3BucketInfoRequest;
@@ -117,12 +125,14 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3ListB
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3ListBucketsResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -528,12 +538,11 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
 
     ListVolumeResponse resp =
         handleError(submitRequest(omRequest)).getListVolumeResponse();
-
-
-
-    return resp.getVolumeInfoList().stream()
-        .map(item -> OmVolumeArgs.getFromProtobuf(item))
-        .collect(Collectors.toList());
+    List<OmVolumeArgs> list = new ArrayList<>(resp.getVolumeInfoList().size());
+    for (VolumeInfo info : resp.getVolumeInfoList()) {
+      list.add(OmVolumeArgs.getFromProtobuf(info));
+    }
+    return list;
   }
 
   /**
@@ -1298,6 +1307,101 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     return OmKeyInfo.getFromProtobuf(resp.getKeyInfo());
   }
 
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    AddAclRequest req = AddAclRequest.newBuilder()
+        .setObj(OzoneObj.toProtobuf(obj))
+        .setAcl(OzoneAcl.toProtobuf(acl))
+        .build();
+
+    OMRequest omRequest = createOMRequest(Type.AddAcl)
+        .setAddAclRequest(req)
+        .build();
+    AddAclResponse addAclResponse =
+        handleError(submitRequest(omRequest)).getAddAclResponse();
+
+    return addAclResponse.getResponse();
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    RemoveAclRequest req = RemoveAclRequest.newBuilder()
+        .setObj(OzoneObj.toProtobuf(obj))
+        .setAcl(OzoneAcl.toProtobuf(acl))
+        .build();
+
+    OMRequest omRequest = createOMRequest(Type.RemoveAcl)
+        .setRemoveAclRequest(req)
+        .build();
+    RemoveAclResponse response =
+        handleError(submitRequest(omRequest)).getRemoveAclResponse();
+
+    return response.getResponse();
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    SetAclRequest.Builder builder = SetAclRequest.newBuilder()
+        .setObj(OzoneObj.toProtobuf(obj));
+
+    acls.parallelStream().forEach(a -> builder.addAcl(OzoneAcl.toProtobuf(a)));
+
+    OMRequest omRequest = createOMRequest(Type.SetAcl)
+        .setSetAclRequest(builder.build())
+        .build();
+    OzoneManagerProtocolProtos.SetAclResponse response =
+        handleError(submitRequest(omRequest)).getSetAclResponse();
+
+    return response.getResponse();
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    GetAclRequest req = GetAclRequest.newBuilder()
+        .setObj(OzoneObj.toProtobuf(obj))
+        .build();
+
+    OMRequest omRequest = createOMRequest(Type.GetAcl)
+        .setGetAclRequest(req)
+        .build();
+    GetAclResponse response =
+        handleError(submitRequest(omRequest)).getGetAclResponse();
+    List<OzoneAcl> acls = new ArrayList<>();
+    response.getAclsList().stream().forEach(a ->
+        acls.add(OzoneAcl.fromProtobuf(a)));
+    return acls;
+  }
+
   @Override
   public OpenKeySession createFile(OmKeyArgs args,
       boolean overWrite, boolean recursive) throws IOException {

+ 9 - 11
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java

@@ -41,12 +41,12 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 
+import java.util.BitSet;
 import java.util.List;
 import java.util.ArrayList;
 
@@ -84,15 +84,14 @@ public final class OMPBHelper {
     default:
       throw new IllegalArgumentException("ACL type is not recognized");
     }
-    List<OzoneAclRights> aclRights = new ArrayList<>();
-
-    for (ACLType right : acl.getRights()) {
-      aclRights.add(OzoneAclRights.valueOf(right.name()));
-    }
+    List<OzoneAclRights> ozAclRights =
+        new ArrayList<>(acl.getAclBitSet().cardinality());
+    acl.getAclBitSet().stream().forEach(a -> ozAclRights.add(
+        OzoneAclRights.valueOf(ACLType.values()[a].name())));
 
     return OzoneAclInfo.newBuilder().setType(aclType)
         .setName(acl.getName())
-        .addAllRights(aclRights)
+        .addAllRights(ozAclRights)
         .build();
   }
 
@@ -122,10 +121,9 @@ public final class OMPBHelper {
       throw new IllegalArgumentException("ACL type is not recognized");
     }
 
-    List<IAccessAuthorizer.ACLType> aclRights = new ArrayList<>();
-    for (OzoneAclRights acl : aclInfo.getRightsList()) {
-      aclRights.add(ACLType.valueOf(acl.name()));
-    }
+    BitSet aclRights = new BitSet(ACLType.getNoOfAcls());
+    aclInfo.getRightsList().stream().forEach(a ->
+        aclRights.set(ACLType.valueOf(a.name()).ordinal()));
     return new OzoneAcl(aclType, aclInfo.getName(), aclRights);
   }
 

+ 10 - 5
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/IAccessAuthorizer.java

@@ -20,7 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ozone.OzoneConsts;
 
-import java.util.List;
+import java.util.BitSet;
 
 /**
  * Public API for Ozone ACLs. Security providers providing support for Ozone
@@ -54,6 +54,11 @@ public interface IAccessAuthorizer {
     WRITE_ACL,
     ALL,
     NONE;
+    private static int length = ACLType.values().length;
+
+    public static int getNoOfAcls() {
+      return length;
+    }
 
     /**
      * Returns the ACL rights based on passed in String.
@@ -86,7 +91,7 @@ public interface IAccessAuthorizer {
       case OzoneConsts.OZONE_ACL_NONE:
         return ACLType.NONE;
       default:
-        throw new IllegalArgumentException(type + " ACL right is not " +
+        throw new IllegalArgumentException("[" + type + "] ACL right is not " +
             "recognized");
       }
 
@@ -98,10 +103,10 @@ public interface IAccessAuthorizer {
      * @param acls ACLType
      * @return String representation of acl
      */
-    public static String getACLString(List<ACLType> acls) {
+    public static String getACLString(BitSet acls) {
       StringBuffer sb = new StringBuffer();
-      acls.forEach(acl -> {
-        sb.append(getAclString(acl));
+      acls.stream().forEach(acl -> {
+        sb.append(getAclString(ACLType.values()[acl]));
       });
       return sb.toString();
     }

+ 11 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java

@@ -19,6 +19,10 @@ package org.apache.hadoop.ozone.security.acl;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
+
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.StoreType.*;
 
 /**
  * Class representing an unique ozone object.
@@ -37,6 +41,13 @@ public abstract class OzoneObj implements IOzoneObj {
     this.storeType = storeType;
   }
 
+  public static OzoneManagerProtocolProtos.OzoneObj toProtobuf(OzoneObj obj) {
+    return OzoneManagerProtocolProtos.OzoneObj.newBuilder()
+        .setResType(ObjectType.valueOf(obj.getResourceType().name()))
+        .setStoreType(valueOf(obj.getStoreType().name()))
+        .setPath(obj.getPath()).build();
+  }
+
   public ResourceType getResourceType() {
     return resType;
   }

+ 31 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java

@@ -17,6 +17,9 @@
 package org.apache.hadoop.ozone.security.acl;
 
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+
+import java.util.StringTokenizer;
 
 /**
  * Class representing an ozone object.
@@ -69,6 +72,34 @@ public final class OzoneObjInfo extends OzoneObj {
     return keyName;
   }
 
+  public static OzoneObjInfo fromProtobuf(OzoneManagerProtocolProtos.OzoneObj
+      proto) {
+    Builder builder = new Builder()
+        .setResType(ResourceType.valueOf(proto.getResType().name()))
+        .setStoreType(StoreType.valueOf(proto.getStoreType().name()));
+    StringTokenizer tokenizer = new StringTokenizer(proto.getPath(),
+        OzoneConsts.OZONE_URI_DELIMITER);
+    // Set volume name.
+    if (tokenizer.hasMoreTokens()) {
+      builder.setVolumeName(tokenizer.nextToken());
+    }
+    // Set bucket name.
+    if (tokenizer.hasMoreTokens()) {
+      builder.setBucketName(tokenizer.nextToken());
+    }
+    // Set key name
+    if (tokenizer.hasMoreTokens()) {
+      StringBuffer sb = new StringBuffer();
+      while (tokenizer.hasMoreTokens()) {
+        sb.append(OzoneConsts.OZONE_URI_DELIMITER);
+        sb.append(tokenizer.nextToken());
+        sb.append(OzoneConsts.OZONE_URI_DELIMITER);
+      }
+      builder.setKeyName(sb.toString());
+    }
+    return builder.build();
+  }
+
   /**
    * Inner builder class.
    */

+ 64 - 1
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -86,6 +86,11 @@ enum Type {
   CreateFile = 72;
   LookupFile = 73;
   ListStatus = 74;
+  AddAcl = 75;
+  RemoveAcl = 76;
+  SetAcl = 77;
+  GetAcl = 78;
+
 }
 
 message OMRequest {
@@ -143,6 +148,10 @@ message OMRequest {
   optional CreateFileRequest                createFileRequest              = 72;
   optional LookupFileRequest                lookupFileRequest              = 73;
   optional ListStatusRequest                listStatusRequest              = 74;
+  optional AddAclRequest                    addAclRequest                  = 75;
+  optional RemoveAclRequest                 removeAclRequest               = 76;
+  optional SetAclRequest                    setAclRequest                  = 77;
+  optional GetAclRequest                    getAclRequest                  = 78;
 }
 
 message OMResponse {
@@ -203,6 +212,10 @@ message OMResponse {
   optional CreateFileResponse                 createFileResponse           = 72;
   optional LookupFileResponse                 lookupFileResponse           = 73;
   optional ListStatusResponse                 listStatusResponse           = 74;
+  optional AddAclResponse                    addAclResponse                = 75;
+  optional RemoveAclResponse                 removeAclResponse             = 76;
+  optional SetAclResponse                   setAclResponse                 = 77;
+  optional GetAclResponse                    getAclResponse                = 78;
 }
 
 enum Status {
@@ -446,6 +459,22 @@ message BucketArgs {
     repeated hadoop.hdds.KeyValue metadata = 7;
 }
 
+message OzoneObj {
+  enum ObjectType {
+    VOLUME = 1;
+    BUCKET = 2;
+    KEY = 3;
+  }
+
+  enum StoreType {
+    OZONE = 1;
+    S3 = 2;
+  }
+  required ObjectType resType = 1;
+  required StoreType storeType = 2  [default = S3];
+  required string path = 3;
+}
+
 message OzoneAclInfo {
     enum OzoneAclType {
         USER = 1;
@@ -471,12 +500,46 @@ message OzoneAclInfo {
     repeated OzoneAclRights rights = 3;
 }
 
+message GetAclRequest {
+  required OzoneObj obj = 1;
+}
+
+message GetAclResponse {
+  repeated OzoneAclInfo acls = 1;
+}
+
+message AddAclRequest {
+  required OzoneObj obj = 1;
+  required OzoneAclInfo acl = 2;
+}
+
+message AddAclResponse {
+  required bool response = 1;
+}
+
+message RemoveAclRequest {
+  required OzoneObj obj = 1;
+  required OzoneAclInfo acl = 2;
+}
+
+message RemoveAclResponse {
+  required bool response = 1;
+}
+
+message SetAclRequest {
+  required OzoneObj obj = 1;
+  repeated OzoneAclInfo acl = 2;
+}
+
+message SetAclResponse {
+  required bool response = 1;
+}
+
 message CreateBucketRequest {
     required BucketInfo bucketInfo = 1;
 }
 
 message CreateBucketResponse {
-
 }
 
 message InfoBucketRequest {

+ 48 - 15
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java

@@ -20,14 +20,16 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Set;
 
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -119,54 +121,85 @@ public class TestOzoneAcls {
   }
 
   @Test
-  public void testAclValues() {
+  public void testAclValues() throws Exception {
     OzoneAcl acl = OzoneAcl.parseAcl("user:bilbo:rw");
     assertEquals(acl.getName(), "bilbo");
-    assertEquals(Arrays.asList(READ, WRITE), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(READ_ACL.ordinal()));
     assertEquals(ACLIdentityType.USER, acl.getType());
 
     acl = OzoneAcl.parseAcl("user:bilbo:a");
     assertEquals("bilbo", acl.getName());
-    assertEquals(Arrays.asList(ALL), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(ALL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(WRITE.ordinal()));
     assertEquals(ACLIdentityType.USER, acl.getType());
 
     acl = OzoneAcl.parseAcl("user:bilbo:r");
     assertEquals("bilbo", acl.getName());
-    assertEquals(Arrays.asList(READ), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
     assertEquals(ACLIdentityType.USER, acl.getType());
 
     acl = OzoneAcl.parseAcl("user:bilbo:w");
     assertEquals("bilbo", acl.getName());
-    assertEquals(Arrays.asList(WRITE), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
     assertEquals(ACLIdentityType.USER, acl.getType());
 
     acl = OzoneAcl.parseAcl("group:hobbit:a");
     assertEquals(acl.getName(), "hobbit");
-    assertEquals(Arrays.asList(ALL), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(ALL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(READ.ordinal()));
     assertEquals(ACLIdentityType.GROUP, acl.getType());
 
     acl = OzoneAcl.parseAcl("world::a");
     assertEquals(acl.getName(), "");
-    assertEquals(Arrays.asList(ALL), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(ALL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(WRITE.ordinal()));
     assertEquals(ACLIdentityType.WORLD, acl.getType());
 
     acl = OzoneAcl.parseAcl("user:bilbo:rwdlncxy");
     assertEquals(acl.getName(), "bilbo");
-    assertEquals(Arrays.asList(READ, WRITE, DELETE, LIST, NONE, CREATE,
-        READ_ACL, WRITE_ACL), acl.getRights());
-    assertEquals(ACLIdentityType.USER, acl.getType());
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
 
     acl = OzoneAcl.parseAcl("group:hadoop:rwdlncxy");
     assertEquals(acl.getName(), "hadoop");
-    assertEquals(Arrays.asList(READ, WRITE, DELETE, LIST, NONE, CREATE,
-        READ_ACL, WRITE_ACL), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
     assertEquals(ACLIdentityType.GROUP, acl.getType());
 
     acl = OzoneAcl.parseAcl("world::rwdlncxy");
     assertEquals(acl.getName(), "");
-    assertEquals(Arrays.asList(READ, WRITE, DELETE, LIST, NONE, CREATE,
-        READ_ACL, WRITE_ACL), acl.getRights());
+    assertTrue(acl.getAclBitSet().get(READ.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(DELETE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(LIST.ordinal()));
+    assertTrue(acl.getAclBitSet().get(NONE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(CREATE.ordinal()));
+    assertTrue(acl.getAclBitSet().get(READ_ACL.ordinal()));
+    assertTrue(acl.getAclBitSet().get(WRITE_ACL.ordinal()));
+    assertFalse(acl.getAclBitSet().get(ALL.ordinal()));
     assertEquals(ACLIdentityType.WORLD, acl.getType());
+
+    LambdaTestUtils.intercept(IllegalArgumentException.class, "ACL right" +
+            " is not", () -> OzoneAcl.parseAcl("world::rwdlncxncxdfsfgbny"));
   }
 
 }

+ 42 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -84,6 +84,8 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.s3.util.OzoneS3Util;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
@@ -97,9 +99,11 @@ import static org.hamcrest.CoreMatchers.either;
 import org.junit.Assert;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -2130,6 +2134,44 @@ public abstract class TestOzoneRpcClientAbstract {
         });
   }
 
+  @Test
+  public void testNativeAclsForVolume() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    OzoneObj ozObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setResType(OzoneObj.ResourceType.VOLUME)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
+    // Get acls for volume.
+    List<OzoneAcl> volAcls = store.getAcl(ozObj);
+    volAcls.forEach(a -> assertTrue(volume.getAcls().contains(a)));
+
+    // Remove all acl's.
+    for (OzoneAcl a : volAcls) {
+      store.removeAcl(ozObj, a);
+    }
+    List<OzoneAcl> newAcls = store.getAcl(ozObj);
+    OzoneVolume finalVolume = store.getVolume(volumeName);
+    assertTrue(finalVolume.getAcls().size() == 0);
+    assertTrue(newAcls.size() == 0);
+
+    // Add acl's and then call getAcl.
+    for (OzoneAcl a : volAcls) {
+      assertFalse(finalVolume.getAcls().contains(a));
+      store.addAcl(ozObj, a);
+      finalVolume = store.getVolume(volumeName);
+      assertTrue(finalVolume.getAcls().contains(a));
+    }
+
+    // Reset acl's.
+    store.setAcl(ozObj, newAcls);
+    finalVolume = store.getVolume(volumeName);
+    newAcls = store.getAcl(ozObj);
+    assertTrue(newAcls.size() == 0);
+    assertTrue(finalVolume.getAcls().size() == 0);
+  }
 
   private byte[] generateData(int size, byte val) {
     byte[] chars = new byte[size];

+ 3 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java

@@ -745,8 +745,8 @@ public class TestOzoneShell {
     OzoneAcl acl = bucket.getAcls().get(aclSize);
     assertTrue(acl.getName().equals("frodo")
         && acl.getType() == ACLIdentityType.USER
-        && acl.getRights().contains(ACLType.READ)
-        && acl.getRights().contains(ACLType.WRITE));
+        && acl.getAclBitSet().get(ACLType.READ.ordinal())
+        && acl.getAclBitSet().get(ACLType.WRITE.ordinal()));
 
     args = new String[] {"bucket", "update",
         url + "/" + vol.getName() + "/" + bucketName, "--removeAcl",
@@ -758,7 +758,7 @@ public class TestOzoneShell {
     assertEquals(1 + aclSize, bucket.getAcls().size());
     assertTrue(acl.getName().equals("samwise")
         && acl.getType() == ACLIdentityType.GROUP
-        && acl.getRights().contains(ACLType.READ));
+        && acl.getAclBitSet().get(ACLType.READ.ordinal()));
 
     // test update bucket for a non-exist bucket
     args = new String[] {"bucket", "update",

+ 85 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -29,6 +29,7 @@ import java.security.PublicKey;
 import java.security.KeyPair;
 import java.security.cert.CertificateException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Objects;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,6 +70,7 @@ import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
@@ -2984,6 +2986,89 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.WRITE_ACL,
+          obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
+    }
+    // TODO: Audit ACL operation.
+    if(obj.getResourceType().equals(ResourceType.VOLUME)) {
+      return volumeManager.addAcl(obj, acl);
+    }
+
+    return false;
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.WRITE_ACL,
+          obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
+    }
+    if(obj.getResourceType().equals(ResourceType.VOLUME)) {
+      return volumeManager.removeAcl(obj, acl);
+    }
+
+    return false;
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.WRITE_ACL,
+          obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
+    }
+    if(obj.getResourceType().equals(ResourceType.VOLUME)) {
+      return volumeManager.setAcl(obj, acls);
+    }
+
+    return false;
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.READ_ACL,
+          obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
+    }
+    if(obj.getResourceType().equals(ResourceType.VOLUME)) {
+      return volumeManager.getAcl(obj);
+    }
+
+    return Collections.emptyList();
+  }
+
   /**
    * Startup options.
    */

+ 41 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java

@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
@@ -23,6 +24,7 @@ import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .VolumeList;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 
 import java.io.IOException;
 import java.util.List;
@@ -141,4 +143,43 @@ public interface VolumeManager {
    */
   List<OmVolumeArgs> listVolumes(String userName, String prefix,
       String startKey, int maxKeys) throws IOException;
+
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for
+   * given object to list of ACLs provided in argument.
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException;
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   * @param obj Ozone object.
+   *
+   * @throws IOException if there is error.
+   * */
+  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+
 }

+ 177 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java

@@ -19,15 +19,18 @@ package org.apache.hadoop.ozone.om;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.utils.db.BatchOperation;
 
 import com.google.common.base.Preconditions;
@@ -508,4 +511,178 @@ public class VolumeManagerImpl implements VolumeManager {
       metadataManager.getLock().releaseUserLock(userName);
     }
   }
+
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    Objects.requireNonNull(obj);
+    Objects.requireNonNull(acl);
+    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
+      throw new IllegalArgumentException("Unexpected argument passed to " +
+          "VolumeManager. OzoneObj type:" + obj.getResourceType());
+    }
+    String volume = obj.getVolumeName();
+    metadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      String dbVolumeKey = metadataManager.getVolumeKey(volume);
+      OmVolumeArgs volumeArgs =
+          metadataManager.getVolumeTable().get(dbVolumeKey);
+      if (volumeArgs == null) {
+        LOG.debug("volume:{} does not exist", volume);
+        throw new OMException("Volume " + volume + " is not found",
+            ResultCodes.VOLUME_NOT_FOUND);
+      }
+      volumeArgs.addAcl(acl);
+      metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
+
+      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
+      //return volumeArgs.getAclMap().hasAccess(userAcl);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Add acl operation failed for volume:{} acl:{}",
+            volume, acl, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseVolumeLock(volume);
+    }
+
+    return true;
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    Objects.requireNonNull(obj);
+    Objects.requireNonNull(acl);
+    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
+      throw new IllegalArgumentException("Unexpected argument passed to " +
+          "VolumeManager. OzoneObj type:" + obj.getResourceType());
+    }
+    String volume = obj.getVolumeName();
+    metadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      String dbVolumeKey = metadataManager.getVolumeKey(volume);
+      OmVolumeArgs volumeArgs =
+          metadataManager.getVolumeTable().get(dbVolumeKey);
+      if (volumeArgs == null) {
+        LOG.debug("volume:{} does not exist", volume);
+        throw new OMException("Volume " + volume + " is not found",
+            ResultCodes.VOLUME_NOT_FOUND);
+      }
+      volumeArgs.removeAcl(acl);
+      metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
+
+      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
+      //return volumeArgs.getAclMap().hasAccess(userAcl);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Remove acl operation failed for volume:{} acl:{}",
+            volume, acl, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseVolumeLock(volume);
+    }
+
+    return true;
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    Objects.requireNonNull(obj);
+    Objects.requireNonNull(acls);
+
+    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
+      throw new IllegalArgumentException("Unexpected argument passed to " +
+          "VolumeManager. OzoneObj type:" + obj.getResourceType());
+    }
+    String volume = obj.getVolumeName();
+    metadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      String dbVolumeKey = metadataManager.getVolumeKey(volume);
+      OmVolumeArgs volumeArgs =
+          metadataManager.getVolumeTable().get(dbVolumeKey);
+      if (volumeArgs == null) {
+        LOG.debug("volume:{} does not exist", volume);
+        throw new OMException("Volume " + volume + " is not found",
+            ResultCodes.VOLUME_NOT_FOUND);
+      }
+      volumeArgs.setAcls(acls);
+      metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
+
+      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
+      //return volumeArgs.getAclMap().hasAccess(userAcl);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Set acl operation failed for volume:{} acls:{}",
+            volume, acls, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseVolumeLock(volume);
+    }
+
+    return true;
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    Objects.requireNonNull(obj);
+
+    if (!obj.getResourceType().equals(OzoneObj.ResourceType.VOLUME)) {
+      throw new IllegalArgumentException("Unexpected argument passed to " +
+          "VolumeManager. OzoneObj type:" + obj.getResourceType());
+    }
+    String volume = obj.getVolumeName();
+    metadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      String dbVolumeKey = metadataManager.getVolumeKey(volume);
+      OmVolumeArgs volumeArgs =
+          metadataManager.getVolumeTable().get(dbVolumeKey);
+      if (volumeArgs == null) {
+        LOG.debug("volume:{} does not exist", volume);
+        throw new OMException("Volume " + volume + " is not found",
+            ResultCodes.VOLUME_NOT_FOUND);
+      }
+
+      Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
+      return volumeArgs.getAclMap().getAcl();
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Get acl operation failed for volume:{}", volume, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseVolumeLock(volume);
+    }
+  }
 }

+ 74 - 19
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -43,7 +44,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
@@ -118,6 +119,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolu
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@@ -127,6 +129,8 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+
 /**
  * Command Handler for OM requests. OM State Machine calls this handler for
  * deserializing the client request and sending it to OM.
@@ -339,20 +343,40 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         createDirectory(request.getCreateDirectoryRequest());
         break;
       case CreateFile:
-        OzoneManagerProtocolProtos.CreateFileResponse createFileResponse =
+        CreateFileResponse createFileResponse =
             createFile(request.getCreateFileRequest());
         responseBuilder.setCreateFileResponse(createFileResponse);
         break;
       case LookupFile:
-        OzoneManagerProtocolProtos.LookupFileResponse lookupFileResponse =
+        LookupFileResponse lookupFileResponse =
             lookupFile(request.getLookupFileRequest());
         responseBuilder.setLookupFileResponse(lookupFileResponse);
         break;
       case ListStatus:
-        OzoneManagerProtocolProtos.ListStatusResponse listStatusResponse =
+        ListStatusResponse listStatusResponse =
             listStatus(request.getListStatusRequest());
         responseBuilder.setListStatusResponse(listStatusResponse);
         break;
+      case AddAcl:
+        AddAclResponse addAclResponse =
+            addAcl(request.getAddAclRequest());
+        responseBuilder.setAddAclResponse(addAclResponse);
+        break;
+      case RemoveAcl:
+        RemoveAclResponse removeAclResponse =
+            removeAcl(request.getRemoveAclRequest());
+        responseBuilder.setRemoveAclResponse(removeAclResponse);
+        break;
+      case SetAcl:
+        SetAclResponse setAclResponse =
+            setAcl(request.getSetAclRequest());
+        responseBuilder.setSetAclResponse(setAclResponse);
+        break;
+      case GetAcl:
+        GetAclResponse getAclResponse =
+            getAcl(request.getGetAclRequest());
+        responseBuilder.setGetAclResponse(getAclResponse);
+        break;
       default:
         responseBuilder.setSuccess(false);
         responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -369,6 +393,37 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return responseBuilder.build();
   }
 
+  private GetAclResponse getAcl(GetAclRequest req) throws IOException {
+    List<OzoneAclInfo> acls = new ArrayList<>();
+
+    List<OzoneAcl> aclList =
+        impl.getAcl(OzoneObjInfo.fromProtobuf(req.getObj()));
+    aclList.parallelStream().forEach(a -> acls.add(OzoneAcl.toProtobuf(a)));
+    return GetAclResponse.newBuilder().addAllAcls(acls).build();
+  }
+
+  private RemoveAclResponse removeAcl(RemoveAclRequest req)
+      throws IOException {
+    boolean response = impl.removeAcl(OzoneObjInfo.fromProtobuf(req.getObj()),
+        OzoneAcl.fromProtobuf(req.getAcl()));
+    return RemoveAclResponse.newBuilder().setResponse(response).build();
+  }
+
+  private SetAclResponse setAcl(SetAclRequest req) throws IOException {
+    List<OzoneAcl> ozoneAcl = new ArrayList<>();
+    req.getAclList().parallelStream().forEach(a ->
+        ozoneAcl.add(OzoneAcl.fromProtobuf(a)));
+    boolean response = impl.setAcl(OzoneObjInfo.fromProtobuf(req.getObj()),
+        ozoneAcl);
+    return SetAclResponse.newBuilder().setResponse(response).build();
+  }
+
+  private AddAclResponse addAcl(AddAclRequest req) throws IOException {
+    boolean response = impl.addAcl(OzoneObjInfo.fromProtobuf(req.getObj()),
+        OzoneAcl.fromProtobuf(req.getAcl()));
+    return AddAclResponse.newBuilder().setResponse(response).build();
+  }
+
   // Convert and exception to corresponding status code
   protected Status exceptionToResponseStatus(IOException ex) {
     if (ex instanceof OMException) {
@@ -899,7 +954,7 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     List<OmPartInfo> omPartInfoList =
         omMultipartUploadListParts.getPartInfoList();
 
-    List<OzoneManagerProtocolProtos.PartInfo> partInfoList =
+    List<PartInfo> partInfoList =
         new ArrayList<>();
 
     omPartInfoList.forEach(partInfo -> partInfoList.add(partInfo.getProto()));
@@ -962,11 +1017,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return rb.build();
   }
 
-  private OzoneManagerProtocolProtos.GetS3SecretResponse getS3Secret(
-      OzoneManagerProtocolProtos.GetS3SecretRequest request)
+  private GetS3SecretResponse getS3Secret(
+      GetS3SecretRequest request)
       throws IOException {
-    OzoneManagerProtocolProtos.GetS3SecretResponse.Builder rb =
-        OzoneManagerProtocolProtos.GetS3SecretResponse.newBuilder();
+    GetS3SecretResponse.Builder rb =
+        GetS3SecretResponse.newBuilder();
 
     rb.setS3Secret(impl.getS3Secret(request.getKerberosID()).getProtobuf());
 
@@ -999,8 +1054,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     impl.createDirectory(omKeyArgs);
   }
 
-  private OzoneManagerProtocolProtos.CreateFileResponse createFile(
-      OzoneManagerProtocolProtos.CreateFileRequest request) throws IOException {
+  private CreateFileResponse createFile(
+      CreateFileRequest request) throws IOException {
     KeyArgs keyArgs = request.getKeyArgs();
     OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
         .setVolumeName(keyArgs.getVolumeName())
@@ -1013,15 +1068,15 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     OpenKeySession keySession =
         impl.createFile(omKeyArgs, request.getIsOverwrite(),
             request.getIsRecursive());
-    return OzoneManagerProtocolProtos.CreateFileResponse.newBuilder()
+    return CreateFileResponse.newBuilder()
         .setKeyInfo(keySession.getKeyInfo().getProtobuf())
         .setID(keySession.getId())
         .setOpenVersion(keySession.getOpenVersion())
         .build();
   }
 
-  private OzoneManagerProtocolProtos.LookupFileResponse lookupFile(
-      OzoneManagerProtocolProtos.LookupFileRequest request)
+  private LookupFileResponse lookupFile(
+      LookupFileRequest request)
       throws IOException {
     KeyArgs keyArgs = request.getKeyArgs();
     OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
@@ -1029,7 +1084,7 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
         .build();
-    return OzoneManagerProtocolProtos.LookupFileResponse.newBuilder()
+    return LookupFileResponse.newBuilder()
         .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())
         .build();
   }
@@ -1038,8 +1093,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return impl;
   }
 
-  private OzoneManagerProtocolProtos.ListStatusResponse listStatus(
-      OzoneManagerProtocolProtos.ListStatusRequest request) throws IOException {
+  private ListStatusResponse listStatus(
+      ListStatusRequest request) throws IOException {
     KeyArgs keyArgs = request.getKeyArgs();
     OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
         .setVolumeName(keyArgs.getVolumeName())
@@ -1049,9 +1104,9 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     List<OzoneFileStatus> statuses =
         impl.listStatus(omKeyArgs, request.getRecursive(),
             request.getStartKey(), request.getNumEntries());
-    OzoneManagerProtocolProtos.ListStatusResponse.Builder
+    ListStatusResponse.Builder
         listStatusResponseBuilder =
-        OzoneManagerProtocolProtos.ListStatusResponse.newBuilder();
+        ListStatusResponse.newBuilder();
     for (OzoneFileStatus status : statuses) {
       listStatusResponseBuilder.addStatuses(status.getProtobuf());
     }

+ 2 - 1
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java

@@ -124,7 +124,8 @@ public class TestOzoneManagerRatisServer {
           .setClientId(clientId)
           .build();
       OmUtils.isReadOnly(request);
-      assertFalse(cmdtype + "is not categorized in OmUtils#isReadyOnly",
+      assertFalse(cmdtype + " is not categorized in " +
+              "OmUtils#isReadyOnly",
           logCapturer.getOutput().contains("CmdType " + cmdtype +" is not " +
               "categorized as readOnly or not."));
       logCapturer.clearOutput();