Przeglądaj źródła

HDDS-1541. Implement addAcl,removeAcl,setAcl,getAcl for Key. Contributed by Ajay Kumat. (#885)

Ajay Yadav 6 lat temu
rodzic
commit
3b1c2577d7
24 zmienionych plików z 837 dodań i 233 usunięć
  1. 19 10
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  2. 2 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java
  3. 16 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
  4. 23 3
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
  5. 30 4
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java
  6. 11 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  7. 33 24
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java
  8. 29 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
  9. 2 0
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  10. 175 109
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
  11. 2 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestMultipleContainerReadWrite.java
  12. 3 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
  13. 4 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
  14. 2 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
  15. 7 0
      hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  16. 62 54
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
  17. 40 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
  18. 340 19
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  19. 8 3
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  20. 2 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java
  21. 2 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
  22. 12 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
  23. 2 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
  24. 11 2
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

+ 19 - 10
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -405,15 +405,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
           .setKeyName(bucketArgs.getEncryptionKey()).build();
     }
 
-    List<OzoneAcl> listOfAcls = new ArrayList<>();
-    //User ACL
-    listOfAcls.add(new OzoneAcl(ACLIdentityType.USER,
-        ugi.getUserName(), userRights));
-    //Group ACLs of the User
-    List<String> userGroups = Arrays.asList(UserGroupInformation
-        .createRemoteUser(ugi.getUserName()).getGroupNames());
-    userGroups.stream().forEach((group) -> listOfAcls.add(
-        new OzoneAcl(ACLIdentityType.GROUP, group, groupRights)));
+    List<OzoneAcl> listOfAcls = getAclList();
     //ACLs from BucketArgs
     if(bucketArgs.getAcls() != null) {
       listOfAcls.addAll(bucketArgs.getAcls());
@@ -437,6 +429,16 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
     ozoneManagerClient.createBucket(builder.build());
   }
 
+  /**
+   * Helper function to get default acl list for current user.
+   *
+   * @return listOfAcls
+   * */
+  private List<OzoneAcl> getAclList() {
+    return OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+        userRights, groupRights);
+  }
+
   @Override
   public void addBucketAcls(
       String volumeName, String bucketName, List<OzoneAcl> addAcls)
@@ -629,6 +631,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
         .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
         .addAllMetadata(metadata)
+        .setAcls(getAclList())
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
@@ -819,6 +822,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .setKeyName(keyName)
         .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
         .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+        .setAcls(getAclList())
         .build();
     OmMultipartInfo multipartInfo = ozoneManagerClient
         .initiateMultipartUpload(keyArgs);
@@ -848,6 +852,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .setIsMultipartKey(true)
         .setMultipartUploadID(uploadID)
         .setMultipartUploadPartNumber(partNumber)
+        .setAcls(getAclList())
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
@@ -963,7 +968,10 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
   public void createDirectory(String volumeName, String bucketName,
       String keyName) throws IOException {
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName).setKeyName(keyName).build();
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setAcls(getAclList())
+        .build();
     ozoneManagerClient.createDirectory(keyArgs);
   }
 
@@ -990,6 +998,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .setDataSize(size)
         .setType(HddsProtos.ReplicationType.valueOf(type.name()))
         .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+        .setAcls(getAclList())
         .build();
     OpenKeySession keySession =
         ozoneManagerClient.createFile(keyArgs, overWrite, recursive);

+ 2 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneAcl.java

@@ -46,9 +46,10 @@ public class OzoneAcl {
   private ACLIdentityType type;
   private String name;
   private BitSet aclBitSet;
+  public static final BitSet ZERO_BITSET = new BitSet(0);
 
   /**
-   * Constructor for OzoneAcl.
+   * Default constructor.
    */
   public OzoneAcl() {
   }

+ 16 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.helpers;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.Auditable;
 
@@ -45,13 +46,15 @@ public final class OmKeyArgs implements Auditable {
   private final int multipartUploadPartNumber;
   private Map<String, String> metadata;
   private boolean refreshPipeline;
+  private List<OzoneAcl> acls;
 
   @SuppressWarnings("parameternumber")
   private OmKeyArgs(String volumeName, String bucketName, String keyName,
       long dataSize, ReplicationType type, ReplicationFactor factor,
       List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
       String uploadID, int partNumber,
-      Map<String, String> metadataMap, boolean refreshPipeline) {
+      Map<String, String> metadataMap, boolean refreshPipeline,
+      List<OzoneAcl> acls) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -64,6 +67,7 @@ public final class OmKeyArgs implements Auditable {
     this.multipartUploadPartNumber = partNumber;
     this.metadata = metadataMap;
     this.refreshPipeline = refreshPipeline;
+    this.acls = acls;
   }
 
   public boolean getIsMultipartKey() {
@@ -86,6 +90,10 @@ public final class OmKeyArgs implements Auditable {
     return factor;
   }
 
+  public List<OzoneAcl> getAcls() {
+    return acls;
+  }
+
   public String getVolumeName() {
     return volumeName;
   }
@@ -166,6 +174,7 @@ public final class OmKeyArgs implements Auditable {
     private int multipartUploadPartNumber;
     private Map<String, String> metadata = new HashMap<>();
     private boolean refreshPipeline;
+    private List<OzoneAcl> acls;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -202,6 +211,11 @@ public final class OmKeyArgs implements Auditable {
       return this;
     }
 
+    public Builder setAcls(List<OzoneAcl> listOfAcls) {
+      this.acls = listOfAcls;
+      return this;
+    }
+
     public Builder setIsMultipartKey(boolean isMultipart) {
       this.isMultipartKey = isMultipart;
       return this;
@@ -235,7 +249,7 @@ public final class OmKeyArgs implements Auditable {
     public OmKeyArgs build() {
       return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
           factor, locationInfoList, isMultipartKey, multipartUploadID,
-          multipartUploadPartNumber, metadata, refreshPipeline);
+          multipartUploadPartNumber, metadata, refreshPipeline, acls);
     }
 
   }

+ 23 - 3
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java

@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.util.Time;
 
@@ -50,6 +51,10 @@ public final class OmKeyInfo extends WithMetadata {
   private HddsProtos.ReplicationType type;
   private HddsProtos.ReplicationFactor factor;
   private FileEncryptionInfo encInfo;
+  /**
+   * ACL Information.
+   */
+  private List<OzoneAclInfo> acls;
 
   @SuppressWarnings("parameternumber")
   OmKeyInfo(String volumeName, String bucketName, String keyName,
@@ -58,7 +63,7 @@ public final class OmKeyInfo extends WithMetadata {
       HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor,
       Map<String, String> metadata,
-      FileEncryptionInfo encInfo) {
+      FileEncryptionInfo encInfo, List<OzoneAclInfo> acls) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -81,6 +86,7 @@ public final class OmKeyInfo extends WithMetadata {
     this.type = type;
     this.metadata = metadata;
     this.encInfo = encInfo;
+    this.acls = acls;
   }
 
   public String getVolumeName() {
@@ -216,6 +222,10 @@ public final class OmKeyInfo extends WithMetadata {
     return encInfo;
   }
 
+  public List<OzoneAclInfo> getAcls() {
+    return acls;
+  }
+
   /**
    * Builder of OmKeyInfo.
    */
@@ -232,6 +242,7 @@ public final class OmKeyInfo extends WithMetadata {
     private HddsProtos.ReplicationFactor factor;
     private Map<String, String> metadata;
     private FileEncryptionInfo encInfo;
+    private List<OzoneAclInfo> acls;
 
     public Builder() {
       this.metadata = new HashMap<>();
@@ -299,11 +310,16 @@ public final class OmKeyInfo extends WithMetadata {
       return this;
     }
 
+    public Builder setAcls(List<OzoneAclInfo> listOfAcls) {
+      this.acls = listOfAcls;
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(
           volumeName, bucketName, keyName, omKeyLocationInfoGroups,
           dataSize, creationTime, modificationTime, type, factor, metadata,
-          encInfo);
+          encInfo, acls);
     }
   }
 
@@ -327,6 +343,9 @@ public final class OmKeyInfo extends WithMetadata {
     if (encInfo != null) {
       kb.setFileEncryptionInfo(OMPBHelper.convert(encInfo));
     }
+    if(acls != null) {
+      kb.addAllAcls(acls);
+    }
     return kb.build();
   }
 
@@ -345,7 +364,8 @@ public final class OmKeyInfo extends WithMetadata {
         keyInfo.getFactor(),
         KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()),
         keyInfo.hasFileEncryptionInfo() ? OMPBHelper.convert(keyInfo
-            .getFileEncryptionInfo()): null);
+            .getFileEncryptionInfo()): null,
+        keyInfo.getAclsList());
   }
 
   @Override

+ 30 - 4
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmOzoneAclMap.java

@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Objects;
 
+import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights.ALL;
 
@@ -81,8 +82,17 @@ public class OmOzoneAclMap {
     if (!getMap(aclType).containsKey(acl.getName())) {
       getMap(aclType).put(acl.getName(), acl.getAclBitSet());
     } else {
-      // throw exception if acl is already added.
-      throw new OMException("Acl " + acl + " already exist.", INVALID_REQUEST);
+      // Check if we are adding new rights to existing acl.
+      BitSet temp = (BitSet) acl.getAclBitSet().clone();
+      BitSet curRights = (BitSet) getMap(aclType).get(acl.getName()).clone();
+      temp.or(curRights);
+
+      if (temp.equals(curRights)) {
+        // throw exception if acl is already added.
+        throw new OMException("Acl " + acl + " already exist.",
+            INVALID_REQUEST);
+      }
+      getMap(aclType).get(acl.getName()).or(acl.getAclBitSet());
     }
   }
 
@@ -105,9 +115,25 @@ public class OmOzoneAclMap {
     Objects.requireNonNull(acl, "Acl should not be null.");
     OzoneAclType aclType = OzoneAclType.valueOf(acl.getType().name());
     if (getMap(aclType).containsKey(acl.getName())) {
-      getMap(aclType).remove(acl.getName());
+      BitSet aclRights = getMap(aclType).get(acl.getName());
+      BitSet bits = (BitSet) acl.getAclBitSet().clone();
+      bits.and(aclRights);
+
+      if (bits.equals(ZERO_BITSET)) {
+        // throw exception if acl doesn't exist.
+        throw new OMException("Acl [" + acl + "] doesn't exist.",
+            INVALID_REQUEST);
+      }
+
+      acl.getAclBitSet().and(aclRights);
+      aclRights.xor(acl.getAclBitSet());
+
+      // Remove the acl as all rights are already set to 0.
+      if (aclRights.equals(ZERO_BITSET)) {
+        getMap(aclType).remove(acl.getName());
+      }
     } else {
-      // throw exception if acl is already added.
+      // throw exception if acl doesn't exist.
       throw new OMException("Acl [" + acl + "] doesn't exist.",
           INVALID_REQUEST);
     }

+ 11 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -668,6 +668,11 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName());
 
+    if(args.getAcls() != null) {
+      keyArgs.addAllAcls(args.getAcls().stream().distinct().map(a ->
+          OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
+    }
+
     if (args.getFactor() != null) {
       keyArgs.setFactor(args.getFactor());
     }
@@ -991,6 +996,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setBucketName(omKeyArgs.getBucketName())
         .setKeyName(omKeyArgs.getKeyName())
         .setFactor(omKeyArgs.getFactor())
+        .addAllAcls(omKeyArgs.getAcls().stream().map(a ->
+            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
         .setType(omKeyArgs.getType());
     multipartInfoInitiateRequest.setKeyArgs(keyArgs.build());
 
@@ -1276,6 +1283,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
+        .addAllAcls(args.getAcls().stream().map(a ->
+            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
         .build();
     CreateDirectoryRequest request = CreateDirectoryRequest.newBuilder()
         .setKeyArgs(keyArgs)
@@ -1412,6 +1421,8 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setDataSize(args.getDataSize())
         .setType(args.getType())
         .setFactor(args.getFactor())
+        .addAllAcls(args.getAcls().stream().map(a ->
+            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
         .build();
     CreateFileRequest createFileRequest = CreateFileRequest.newBuilder()
             .setKeyArgs(keyArgs)

+ 33 - 24
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java

@@ -16,10 +16,10 @@
  */
 package org.apache.hadoop.ozone.security.acl;
 
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 
-import java.util.StringTokenizer;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 
 /**
  * Class representing an ozone object.
@@ -45,16 +45,14 @@ public final class OzoneObjInfo extends OzoneObj {
     case VOLUME:
       return getVolumeName();
     case BUCKET:
-      return getVolumeName() + OzoneConsts.OZONE_URI_DELIMITER
-          + getBucketName();
+      return getVolumeName() + OZONE_URI_DELIMITER + getBucketName();
     case KEY:
-      return getVolumeName() + OzoneConsts.OZONE_URI_DELIMITER
-          + getBucketName() + OzoneConsts.OZONE_URI_DELIMITER + getKeyName();
+      return getVolumeName() + OZONE_URI_DELIMITER + getBucketName()
+          + OZONE_URI_DELIMITER + getKeyName();
     default:
       throw new IllegalArgumentException("Unknown resource " +
         "type" + getResourceType());
     }
-
   }
 
   @Override
@@ -77,25 +75,36 @@ public final class OzoneObjInfo extends OzoneObj {
     Builder builder = new Builder()
         .setResType(ResourceType.valueOf(proto.getResType().name()))
         .setStoreType(StoreType.valueOf(proto.getStoreType().name()));
-    StringTokenizer tokenizer = new StringTokenizer(proto.getPath(),
-        OzoneConsts.OZONE_URI_DELIMITER);
-    // Set volume name.
-    if (tokenizer.hasMoreTokens()) {
-      builder.setVolumeName(tokenizer.nextToken());
-    }
-    // Set bucket name.
-    if (tokenizer.hasMoreTokens()) {
-      builder.setBucketName(tokenizer.nextToken());
+    String[] tokens = StringUtils.splitPreserveAllTokens(proto.getPath(),
+        OZONE_URI_DELIMITER);
+    if(tokens == null) {
+      throw new IllegalArgumentException("Unexpected path:" + proto.getPath());
     }
-    // Set key name
-    if (tokenizer.hasMoreTokens()) {
-      StringBuffer sb = new StringBuffer();
-      while (tokenizer.hasMoreTokens()) {
-        sb.append(OzoneConsts.OZONE_URI_DELIMITER);
-        sb.append(tokenizer.nextToken());
-        sb.append(OzoneConsts.OZONE_URI_DELIMITER);
+    // Set volume name.
+    switch (proto.getResType()) {
+    case VOLUME:
+      builder.setVolumeName(tokens[0]);
+      break;
+    case BUCKET:
+      if (tokens.length < 2) {
+        throw new IllegalArgumentException("Unexpected argument for " +
+            "Ozone key. Path:" + proto.getPath());
+      }
+      builder.setVolumeName(tokens[0]);
+      builder.setBucketName(tokens[1]);
+      break;
+    case KEY:
+      if (tokens.length != 3) {
+        throw new IllegalArgumentException("Unexpected argument for " +
+            "Ozone key. Path:" + proto.getPath());
       }
-      builder.setKeyName(sb.toString());
+      builder.setVolumeName(tokens[0]);
+      builder.setBucketName(tokens[1]);
+      builder.setKeyName(tokens[2]);
+      break;
+    default:
+      throw new IllegalArgumentException("Unexpected type for " +
+          "Ozone key. Type:" + proto.getResType());
     }
     return builder.build();
   }

+ 29 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java

@@ -23,6 +23,8 @@ import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
 import java.util.TimeZone;
 import java.util.UUID;
@@ -31,11 +33,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.ratis.util.TimeDuration;
 
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
+
 /**
  * Set of Utility functions used in ozone.
  */
@@ -236,4 +243,26 @@ public final class OzoneUtils {
     return getTimeDuration(conf, key, defaultValue)
         .toLong(TimeUnit.MILLISECONDS);
   }
+
+  /**
+   * Helper function to get deafult acl list for current user.
+   *
+   * @param userName
+   * @param userGroups
+   * @return listOfAcls
+   * */
+  public static List<OzoneAcl> getAclList(String userName,
+      List<String> userGroups, ACLType userRights, ACLType groupRights) {
+
+    List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    // User ACL.
+    listOfAcls.add(new OzoneAcl(USER, userName, userRights));
+    if(userGroups != null) {
+      // Group ACLs of the User.
+      userGroups.stream().forEach((group) -> listOfAcls.add(
+          new OzoneAcl(GROUP, group, groupRights)));
+    }
+    return listOfAcls;
+  }
 }

+ 2 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -615,6 +615,7 @@ message KeyArgs {
     optional string multipartUploadID = 9;
     optional uint32 multipartNumber = 10;
     repeated hadoop.hdds.KeyValue metadata = 11;
+    repeated OzoneAclInfo acls = 12;
 }
 
 message KeyLocation {
@@ -652,6 +653,7 @@ message KeyInfo {
     optional uint64 latestVersion = 10;
     repeated hadoop.hdds.KeyValue metadata = 11;
     optional FileEncryptionInfoProto fileEncryptionInfo = 12;
+    repeated OzoneAclInfo acls = 13;
 }
 
 message OzoneFileStatusProto {

+ 175 - 109
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -84,8 +84,10 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.s3.util.OzoneS3Util;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
@@ -94,6 +96,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
+
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.either;
 import org.junit.Assert;
@@ -607,8 +612,8 @@ public abstract class TestOzoneRpcClientAbstract {
       String keyName = UUID.randomUUID().toString();
 
       OzoneOutputStream out = bucket.createKey(keyName,
-          value.getBytes().length, ReplicationType.STAND_ALONE,
-          ReplicationFactor.ONE, new HashMap<>());
+          value.getBytes().length, STAND_ALONE,
+          ONE, new HashMap<>());
       out.write(value.getBytes());
       out.close();
       OzoneKey key = bucket.getKey(keyName);
@@ -617,8 +622,8 @@ public abstract class TestOzoneRpcClientAbstract {
       byte[] fileContent = new byte[value.getBytes().length];
       is.read(fileContent);
       Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, ReplicationType.STAND_ALONE,
-          ReplicationFactor.ONE));
+          keyName, STAND_ALONE,
+          ONE));
       Assert.assertEquals(value, new String(fileContent));
       Assert.assertTrue(key.getCreationTime() >= currentTime);
       Assert.assertTrue(key.getModificationTime() >= currentTime);
@@ -639,7 +644,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
     // create the initial key with size 0, write will allocate the first block.
     OzoneOutputStream out = bucket.createKey(keyName, 0,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>());
+        STAND_ALONE, ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
@@ -677,7 +682,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
       OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.RATIS,
-          ReplicationFactor.ONE, new HashMap<>());
+          ONE, new HashMap<>());
       out.write(value.getBytes());
       out.close();
       OzoneKey key = bucket.getKey(keyName);
@@ -687,7 +692,7 @@ public abstract class TestOzoneRpcClientAbstract {
       is.read(fileContent);
       is.close();
       Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
-          keyName, ReplicationType.RATIS, ReplicationFactor.ONE));
+          keyName, ReplicationType.RATIS, ONE));
       Assert.assertEquals(value, new String(fileContent));
       Assert.assertTrue(key.getCreationTime() >= currentTime);
       Assert.assertTrue(key.getModificationTime() >= currentTime);
@@ -832,7 +837,7 @@ public abstract class TestOzoneRpcClientAbstract {
     // Write data into a key
     OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes().length, ReplicationType.RATIS,
-        ReplicationFactor.ONE, new HashMap<>());
+        ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
 
@@ -904,8 +909,8 @@ public abstract class TestOzoneRpcClientAbstract {
     //String keyValue = "this is a test value.glx";
     // create the initial key with size 0, write will allocate the first block.
     OzoneOutputStream out = bucket.createKey(keyName,
-        keyValue.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE, new HashMap<>());
+        keyValue.getBytes().length, STAND_ALONE,
+        ONE, new HashMap<>());
     out.write(keyValue.getBytes());
     out.close();
 
@@ -993,7 +998,7 @@ public abstract class TestOzoneRpcClientAbstract {
     // Write data into a key
     OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes().length, ReplicationType.RATIS,
-        ReplicationFactor.ONE, new HashMap<>());
+        ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
 
@@ -1161,8 +1166,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
     OzoneOutputStream out = bucket.createKey(keyName,
-        value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE, new HashMap<>());
+        value.getBytes().length, STAND_ALONE,
+        ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OzoneKey key = bucket.getKey(keyName);
@@ -1185,8 +1190,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
     OzoneOutputStream out = bucket.createKey(fromKeyName,
-        value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE, new HashMap<>());
+        value.getBytes().length, STAND_ALONE,
+        ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OzoneKey key = bucket.getKey(fromKeyName);
@@ -1380,25 +1385,25 @@ public abstract class TestOzoneRpcClientAbstract {
       byte[] value = RandomStringUtils.randomAscii(10240).getBytes();
       OzoneOutputStream one = volAbucketA.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       one.write(value);
       one.close();
       OzoneOutputStream two = volAbucketB.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       two.write(value);
       two.close();
       OzoneOutputStream three = volBbucketA.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       three.write(value);
       three.close();
       OzoneOutputStream four = volBbucketB.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       four.write(value);
       four.close();
@@ -1413,25 +1418,25 @@ public abstract class TestOzoneRpcClientAbstract {
       byte[] value = RandomStringUtils.randomAscii(10240).getBytes();
       OzoneOutputStream one = volAbucketA.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       one.write(value);
       one.close();
       OzoneOutputStream two = volAbucketB.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       two.write(value);
       two.close();
       OzoneOutputStream three = volBbucketA.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       three.write(value);
       three.close();
       OzoneOutputStream four = volBbucketB.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          value.length, STAND_ALONE, ONE,
           new HashMap<>());
       four.write(value);
       four.close();
@@ -1512,7 +1517,7 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+        STAND_ALONE, ONE);
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
@@ -1524,7 +1529,7 @@ public abstract class TestOzoneRpcClientAbstract {
     // Call initiate multipart upload for the same key again, this should
     // generate a new uploadID.
     multipartInfo = bucket.initiateMultipartUpload(keyName,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+        STAND_ALONE, ONE);
 
     assertNotNull(multipartInfo);
     Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
@@ -1580,7 +1585,7 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+        STAND_ALONE, ONE);
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
@@ -1618,7 +1623,7 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
     OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+        STAND_ALONE, ONE);
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
@@ -1746,7 +1751,6 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
 
     doMultipartUpload(bucket, keyName, (byte)98);
-
   }
 
 
@@ -1782,18 +1786,18 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
 
     // Initiate multipart upload
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
 
     // Upload Parts
     Map<Integer, String> partsMap = new TreeMap<>();
     // Uploading part 1 with less than min size
-    String partName = uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(
-        UTF_8));
+    String partName = uploadPart(bucket, keyName, uploadID, 1,
+        "data".getBytes(UTF_8));
     partsMap.put(1, partName);
 
-    partName = uploadPart(bucket, keyName, uploadID, 2, "data".getBytes(
-        UTF_8));
+    partName = uploadPart(bucket, keyName, uploadID, 2,
+        "data".getBytes(UTF_8));
     partsMap.put(2, partName);
 
 
@@ -1815,8 +1819,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
 
     // We have not uploaded any parts, but passing some list it should throw
     // error.
@@ -1840,8 +1844,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
 
     uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
     // We have not uploaded any parts, but passing some list it should throw
@@ -1865,8 +1869,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
 
     uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
     // We have not uploaded any parts, but passing some list it should throw
@@ -1905,8 +1909,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
     bucket.abortMultipartUpload(keyName, uploadID);
   }
 
@@ -1921,8 +1925,8 @@ public abstract class TestOzoneRpcClientAbstract {
     volume.createBucket(bucketName);
     OzoneBucket bucket = volume.getBucket(bucketName);
 
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
     uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(UTF_8));
     bucket.abortMultipartUpload(keyName, uploadID);
   }
@@ -1939,8 +1943,8 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
 
     Map<Integer, String> partsMap = new TreeMap<>();
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
     String partName1 = uploadPart(bucket, keyName, uploadID, 1,
         generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
     partsMap.put(1, partName1);
@@ -1956,7 +1960,7 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
         bucket.listParts(keyName, uploadID, 0, 3);
 
-    Assert.assertEquals(ReplicationType.STAND_ALONE,
+    Assert.assertEquals(STAND_ALONE,
         ozoneMultipartUploadPartListParts.getReplicationType());
     Assert.assertEquals(3,
         ozoneMultipartUploadPartListParts.getPartInfoList().size());
@@ -1990,8 +1994,8 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
 
     Map<Integer, String> partsMap = new TreeMap<>();
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
     String partName1 = uploadPart(bucket, keyName, uploadID, 1,
         generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
     partsMap.put(1, partName1);
@@ -2007,7 +2011,7 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneMultipartUploadPartListParts ozoneMultipartUploadPartListParts =
         bucket.listParts(keyName, uploadID, 0, 2);
 
-    Assert.assertEquals(ReplicationType.STAND_ALONE,
+    Assert.assertEquals(STAND_ALONE,
         ozoneMultipartUploadPartListParts.getReplicationType());
 
     Assert.assertEquals(2,
@@ -2095,8 +2099,8 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
 
 
-    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
-        .STAND_ALONE, ReplicationFactor.ONE);
+    String uploadID = initiateMultipartUpload(bucket, keyName, STAND_ALONE,
+        ONE);
     uploadPart(bucket, keyName, uploadID, 1,
         generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, (byte)97));
 
@@ -2108,7 +2112,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
     Assert.assertEquals(0,
         ozoneMultipartUploadPartListParts.getPartInfoList().size());
-    Assert.assertEquals(ReplicationType.STAND_ALONE,
+    Assert.assertEquals(STAND_ALONE,
         ozoneMultipartUploadPartListParts.getReplicationType());
 
     // As we don't have any parts with greater than partNumberMarker and list
@@ -2138,54 +2142,43 @@ public abstract class TestOzoneRpcClientAbstract {
   public void testNativeAclsForVolume() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
+
     OzoneObj ozObj = new OzoneObjInfo.Builder()
         .setVolumeName(volumeName)
         .setResType(OzoneObj.ResourceType.VOLUME)
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
-    // Get acls for volume.
-    List<OzoneAcl> volAcls = store.getAcl(ozObj);
-    volAcls.forEach(a -> assertTrue(volume.getAcls().contains(a)));
 
-    // Remove all acl's.
-    for (OzoneAcl a : volAcls) {
-      store.removeAcl(ozObj, a);
-    }
-    List<OzoneAcl> newAcls = store.getAcl(ozObj);
-    OzoneVolume finalVolume = store.getVolume(volumeName);
-    assertTrue(finalVolume.getAcls().size() == 0);
-    assertTrue(newAcls.size() == 0);
-
-    // Add acl's and then call getAcl.
-    for (OzoneAcl a : volAcls) {
-      // Try removing an acl which doesn't exist, it should return false.
-      assertFalse(finalVolume.getAcls().contains(a));
-      assertFalse(store.removeAcl(ozObj, a));
+    validateOzoneAcl(ozObj);
+  }
 
-      assertTrue(store.addAcl(ozObj, a));
-      finalVolume = store.getVolume(volumeName);
-      assertTrue(finalVolume.getAcls().contains(a));
+  @Test
+  public void testNativeAclsForBucket() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
 
-      // Call addAcl again, this time operation will fail as
-      // acl is already added.
-      assertFalse(store.addAcl(ozObj, a));
-    }
-    assertTrue(finalVolume.getAcls().size() == volAcls.size());
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    assertNotNull("Bucket creation failed", bucket);
 
+    OzoneObj ozObj = new OzoneObjInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .build();
 
-    // Reset acl's.
-    store.setAcl(ozObj, newAcls);
-    finalVolume = store.getVolume(volumeName);
-    newAcls = store.getAcl(ozObj);
-    assertTrue(newAcls.size() == 0);
-    assertTrue(finalVolume.getAcls().size() == 0);
+    validateOzoneAcl(ozObj);
   }
 
   @Test
-  public void testNativeAclsForBucket() throws Exception {
+  public void testNativeAclsForKey() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
+    String key1 = UUID.randomUUID().toString();
+    String key2 = UUID.randomUUID().toString();
 
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
@@ -2193,48 +2186,121 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
     assertNotNull("Bucket creation failed", bucket);
 
+    writeKey(key1, bucket);
+    writeKey(key2, bucket);
+
     OzoneObj ozObj = new OzoneObjInfo.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .setResType(OzoneObj.ResourceType.BUCKET)
+        .setKeyName(key1)
+        .setResType(OzoneObj.ResourceType.KEY)
         .setStoreType(OzoneObj.StoreType.OZONE)
         .build();
+
+    validateOzoneAcl(ozObj);
+  }
+
+  /**
+   * Helper function to get default acl list for current user.
+   *
+   * @return list of default Acls.
+   * @throws IOException
+   * */
+  private List<OzoneAcl> getAclList(OzoneConfiguration conf)
+      throws IOException {
+    List<OzoneAcl> listOfAcls = new ArrayList<>();
+    //User ACL
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    OzoneAclConfig aclConfig = conf.getObject(OzoneAclConfig.class);
+    ACLType userRights = aclConfig.getUserDefaultRights();
+    ACLType groupRights = aclConfig.getGroupDefaultRights();
+
+    listOfAcls.add(new OzoneAcl(ACLIdentityType.USER,
+        ugi.getUserName(), userRights));
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(UserGroupInformation
+        .createRemoteUser(ugi.getUserName()).getGroupNames());
+    userGroups.stream().forEach((group) -> listOfAcls.add(
+        new OzoneAcl(ACLIdentityType.GROUP, group, groupRights)));
+    return listOfAcls;
+  }
+
+  /**
+   * Helper function to validate ozone Acl for given object.
+   * @param ozObj
+   * */
+  private void validateOzoneAcl(OzoneObj ozObj) throws IOException {
     // Get acls for volume.
-    List<OzoneAcl> volAcls = store.getAcl(ozObj);
-    volAcls.forEach(a -> assertTrue(bucket.getAcls().contains(a)));
+    List<OzoneAcl> expectedAcls = getAclList(new OzoneConfiguration());
+
+    // Case:1 Add new acl permission to existing acl.
+    if(expectedAcls.size()>0) {
+      OzoneAcl oldAcl = expectedAcls.get(0);
+      OzoneAcl newAcl = new OzoneAcl(oldAcl.getType(), oldAcl.getName(),
+          ACLType.READ_ACL);
+      // Verify that operation successful.
+      assertTrue(store.addAcl(ozObj, newAcl));
+      List<OzoneAcl> acls = store.getAcl(ozObj);
+
+      assertTrue(acls.size() == expectedAcls.size());
+      boolean aclVerified = false;
+      for(OzoneAcl acl: acls) {
+        if(acl.getName().equals(newAcl.getName())) {
+          assertTrue(acl.getAclList().contains(ACLType.READ_ACL));
+          aclVerified = true;
+        }
+      }
+      assertTrue("New acl expected but not found.", aclVerified);
+      aclVerified = false;
+
+      // Case:2 Remove newly added acl permission.
+      assertTrue(store.removeAcl(ozObj, newAcl));
+      acls = store.getAcl(ozObj);
+      assertTrue(acls.size() == expectedAcls.size());
+      for(OzoneAcl acl: acls) {
+        if(acl.getName().equals(newAcl.getName())) {
+          assertFalse(acl.getAclList().contains(ACLType.READ_ACL));
+          aclVerified = true;
+        }
+      }
+      assertTrue("New acl expected but not found.", aclVerified);
+    } else {
+      fail("Default acl should not be empty.");
+    }
+
+    List<OzoneAcl> keyAcls = store.getAcl(ozObj);
+    expectedAcls.forEach(a -> assertTrue(keyAcls.contains(a)));
 
     // Remove all acl's.
-    for (OzoneAcl a : volAcls) {
-      assertTrue(store.removeAcl(ozObj, a));
+    for (OzoneAcl a : expectedAcls) {
+      store.removeAcl(ozObj, a);
     }
     List<OzoneAcl> newAcls = store.getAcl(ozObj);
-    OzoneBucket finalBuck = volume.getBucket(bucketName);
-    assertTrue(finalBuck.getAcls().size() == 0);
     assertTrue(newAcls.size() == 0);
 
     // Add acl's and then call getAcl.
-    for (OzoneAcl a : volAcls) {
-      // Try removing an acl which doesn't exist, it should return false.
-      assertFalse(finalBuck.getAcls().contains(a));
-      assertFalse(store.removeAcl(ozObj, a));
-
-      // Add acl should succeed.
+    int aclCount = 0;
+    for (OzoneAcl a : expectedAcls) {
+      aclCount++;
       assertTrue(store.addAcl(ozObj, a));
-      finalBuck = volume.getBucket(bucketName);
-      assertTrue(finalBuck.getAcls().contains(a));
-
-      // Call addAcl again, this time operation will return false as
-      // acl is already added.
-      assertFalse(store.addAcl(ozObj, a));
+      assertTrue(store.getAcl(ozObj).size() == aclCount);
     }
-    assertTrue(finalBuck.getAcls().size() == volAcls.size());
+    newAcls = store.getAcl(ozObj);
+    assertTrue(newAcls.size() == expectedAcls.size());
+    List<OzoneAcl> finalNewAcls = newAcls;
+    expectedAcls.forEach(a -> assertTrue(finalNewAcls.contains(a)));
 
     // Reset acl's.
-    store.setAcl(ozObj, newAcls);
-    finalBuck = volume.getBucket(bucketName);
+    store.setAcl(ozObj, new ArrayList<>());
     newAcls = store.getAcl(ozObj);
     assertTrue(newAcls.size() == 0);
-    assertTrue(finalBuck.getAcls().size() == 0);
+  }
+
+  private void writeKey(String key1, OzoneBucket bucket) throws IOException {
+    OzoneOutputStream out = bucket.createKey(key1, 1024, STAND_ALONE,
+        ONE, new HashMap<>());
+    out.write(RandomStringUtils.random(1024).getBytes());
+    out.close();
   }
 
   private byte[] generateData(int size, byte val) {

+ 2 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestMultipleContainerReadWrite.java

@@ -112,6 +112,7 @@ public class TestMultipleContainerReadWrite {
     String dataString = RandomStringUtils.randomAscii(3 * (int)OzoneConsts.MB);
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
     keyArgs.setSize(3 * (int)OzoneConsts.MB);
+    keyArgs.setUserName(userName);
 
     try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
       outputStream.write(dataString.getBytes());
@@ -190,6 +191,7 @@ public class TestMultipleContainerReadWrite {
     String dataString = RandomStringUtils.randomAscii(500);
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
     keyArgs.setSize(500);
+    keyArgs.setUserName(userName);
 
     try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
       outputStream.write(dataString.getBytes());

+ 3 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java

@@ -44,6 +44,7 @@ import org.junit.rules.ExpectedException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -116,6 +117,7 @@ public class TestOmBlockVersioning {
         .setKeyName(keyName)
         .setDataSize(1000)
         .setRefreshPipeline(true)
+        .setAcls(new ArrayList<>())
         .build();
 
     // 1st update, version 0
@@ -220,6 +222,7 @@ public class TestOmBlockVersioning {
 
     String dataString = RandomStringUtils.randomAlphabetic(100);
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setUserName(userName);
     // this write will create 1st version with one block
     try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
       stream.write(dataString.getBytes());

+ 4 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java

@@ -486,6 +486,7 @@ public class TestOzoneManager {
     String dataString = RandomStringUtils.randomAscii(100);
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
     keyArgs.setSize(100);
+    keyArgs.setUserName(userName);
     try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
       stream.write(dataString.getBytes());
     }
@@ -525,6 +526,7 @@ public class TestOzoneManager {
     String dataString = RandomStringUtils.randomAscii(100);
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
     keyArgs.setSize(100);
+    keyArgs.setUserName(userName);
     try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
       stream.write(dataString.getBytes());
     }
@@ -567,6 +569,7 @@ public class TestOzoneManager {
 
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
     keyArgs.setSize(100);
+    keyArgs.setUserName(userName);
     String dataString = RandomStringUtils.randomAscii(100);
     try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
       stream.write(dataString.getBytes());
@@ -577,6 +580,7 @@ public class TestOzoneManager {
     // That is this overwrite only overwrites the keys on OM. We need to
     // garbage collect those blocks from datanode.
     KeyArgs keyArgs2 = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs2.setUserName(userName);
     storageHandler.newKeyWriter(keyArgs2);
     Assert
         .assertEquals(numKeyAllocateFails, omMetrics.getNumKeyAllocateFails());

+ 2 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -134,6 +135,7 @@ public class TestScmSafeMode {
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setDataSize(1000)
+        .setAcls(Collections.emptyList())
         .build();
     OmVolumeArgs volArgs = new OmVolumeArgs.Builder()
         .setAdminName(adminName)

+ 7 - 0
hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -66,7 +66,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -444,6 +446,8 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public OutputStream newKeyWriter(KeyArgs args) throws IOException,
       OzoneException {
+    Objects.requireNonNull(args.getUserName(),
+        "Username should not be null");
     OmKeyArgs keyArgs = new OmKeyArgs.Builder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
@@ -451,6 +455,9 @@ public final class DistributedStorageHandler implements StorageHandler {
         .setDataSize(args.getSize())
         .setType(xceiverClientManager.getType())
         .setFactor(xceiverClientManager.getFactor())
+        .setAcls(OzoneUtils.getAclList(args.getUserName(),
+            args.getGroups() != null ? Arrays.asList(args.getGroups()) : null,
+            ACLType.ALL, ACLType.ALL))
         .build();
     // contact OM to allocate a block for key.
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);

+ 62 - 54
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java

@@ -17,6 +17,8 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Objects;
 
@@ -40,6 +42,7 @@ import org.iq80.leveldb.DBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 
 /**
@@ -404,27 +407,44 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket " + bucket + " is not found",
             BUCKET_NOT_FOUND);
       }
-      List<OzoneAcl> list = bucketInfo.getAcls();
-      if(!validateAddAcl(acl, list)) {
-        // New acl can't be added as it is not consistent with existing ACLs.
-        LOG.info("New acl:{} can't be added as it is not consistent with " +
-            "existing ACLs:{}.", acl, StringUtils.join(",", list));
-        return false;
+
+      // Case 1: When we are adding more rights to existing user/group.
+      boolean addToExistingAcl = false;
+      for(OzoneAcl a: bucketInfo.getAcls()) {
+        if(a.getName().equals(acl.getName()) &&
+            a.getType().equals(acl.getType())) {
+          BitSet bits = (BitSet) acl.getAclBitSet().clone();
+          bits.or(a.getAclBitSet());
+
+          if (bits.equals(a.getAclBitSet())) {
+            return false;
+          }
+          a.getAclBitSet().or(acl.getAclBitSet());
+          addToExistingAcl = true;
+          break;
+        }
       }
-      list.add(acl);
-      OmBucketInfo updatedBucket = OmBucketInfo.newBuilder()
-          .setVolumeName(bucketInfo.getVolumeName())
-          .setBucketName(bucketInfo.getBucketName())
-          .setStorageType(bucketInfo.getStorageType())
-          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-          .setCreationTime(bucketInfo.getCreationTime())
-          .setBucketEncryptionKey(bucketInfo.getEncryptionKeyInfo())
-          .addAllMetadata(bucketInfo.getMetadata())
-          .setAcls(list)
-          .build();
-      // TODO:HDDS-1619 OM HA changes required for all acl operations.
 
-      metadataManager.getBucketTable().put(dbBucketKey, updatedBucket);
+      // Case 2: When a completely new acl is added.
+      if(!addToExistingAcl) {
+        List<OzoneAcl> newAcls = bucketInfo.getAcls();
+        if(newAcls == null) {
+          newAcls = new ArrayList<>();
+        }
+        newAcls.add(acl);
+        bucketInfo = OmBucketInfo.newBuilder()
+            .setVolumeName(bucketInfo.getVolumeName())
+            .setBucketName(bucketInfo.getBucketName())
+            .setStorageType(bucketInfo.getStorageType())
+            .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
+            .setCreationTime(bucketInfo.getCreationTime())
+            .setBucketEncryptionKey(bucketInfo.getEncryptionKeyInfo())
+            .addAllMetadata(bucketInfo.getMetadata())
+            .setAcls(newAcls)
+            .build();
+      }
+
+      metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Add acl operation failed for bucket:{}/{} acl:{}",
@@ -466,26 +486,31 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket " + bucket + " is not found",
             BUCKET_NOT_FOUND);
       }
-      List<OzoneAcl> list = bucketInfo.getAcls();
-      if (!list.contains(acl)) {
-        // Return false if acl doesn't exist in current ACLs.
-        LOG.info("Acl:{} not found in existing ACLs:{}.", acl,
-            StringUtils.join(",", list));
-        return false;
+
+      // When we are removing subset of rights from existing acl.
+      for(OzoneAcl a: bucketInfo.getAcls()) {
+        if(a.getName().equals(acl.getName()) &&
+            a.getType().equals(acl.getType())) {
+          BitSet bits = (BitSet) acl.getAclBitSet().clone();
+          bits.and(a.getAclBitSet());
+
+          if (bits.equals(ZERO_BITSET)) {
+            return false;
+          }
+          bits = (BitSet) acl.getAclBitSet().clone();
+          bits.and(a.getAclBitSet());
+          a.getAclBitSet().xor(bits);
+
+          if(a.getAclBitSet().equals(ZERO_BITSET)) {
+            bucketInfo.getAcls().remove(a);
+          }
+          break;
+        } else {
+          return false;
+        }
       }
-      list.remove(acl);
-      OmBucketInfo updatedBucket = OmBucketInfo.newBuilder()
-          .setVolumeName(bucketInfo.getVolumeName())
-          .setBucketName(bucketInfo.getBucketName())
-          .setStorageType(bucketInfo.getStorageType())
-          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-          .setCreationTime(bucketInfo.getCreationTime())
-          .setBucketEncryptionKey(bucketInfo.getEncryptionKeyInfo())
-          .addAllMetadata(bucketInfo.getMetadata())
-          .setAcls(list)
-          .build();
 
-      metadataManager.getBucketTable().put(dbBucketKey, updatedBucket);
+      metadataManager.getBucketTable().put(dbBucketKey, bucketInfo);
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Remove acl operation failed for bucket:{}/{} acl:{}",
@@ -552,23 +577,6 @@ public class BucketManagerImpl implements BucketManager {
     return true;
   }
 
-  /**
-   * Validates if a new acl addition is consistent with current ACL list.
-   * @param newAcl new acl to be added.
-   * @param currentAcls list of acls.
-   *
-   * @return true if newAcl addition to existing acls is valid, else false.
-   * */
-  private boolean validateAddAcl(OzoneAcl newAcl, List<OzoneAcl> currentAcls) {
-
-    // Check 1: Check for duplicate.
-    if(currentAcls.contains(newAcl)) {
-      return false;
-    }
-
-    return true;
-  }
-
   /**
    * Returns list of ACLs for given Ozone object.
    *

+ 40 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -35,6 +36,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.utils.BackgroundService;
 
 import java.io.IOException;
@@ -285,4 +287,42 @@ public interface KeyManager extends OzoneManagerFS {
       String keyName, String uploadID, int partNumberMarker,
       int maxParts)  throws IOException;
 
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException;
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for
+   * given object to list of ACLs provided in argument.
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   *
+   * @throws IOException if there is error.
+   * */
+  boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException;
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   * @param obj Ozone object.
+   *
+   * @throws IOException if there is error.
+   * */
+  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+
 }

+ 340 - 19
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -26,11 +26,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Strings;
 import org.apache.commons.codec.digest.DigestUtils;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
@@ -74,7 +77,11 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyLocation;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -108,6 +115,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import org.slf4j.Logger;
@@ -218,14 +229,14 @@ public class KeyManagerImpl implements KeyManager {
       if (metadataManager.getVolumeTable().get(volumeKey) == null) {
         LOG.error("volume not found: {}", volumeName);
         throw new OMException("Volume not found",
-            OMException.ResultCodes.VOLUME_NOT_FOUND);
+            VOLUME_NOT_FOUND);
       }
 
       // if the volume exists but bucket does not exist, throw bucket not found
       // exception
       LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
       throw new OMException("Bucket not found",
-          OMException.ResultCodes.BUCKET_NOT_FOUND);
+          BUCKET_NOT_FOUND);
     }
   }
 
@@ -243,7 +254,7 @@ public class KeyManagerImpl implements KeyManager {
     if (metadataManager.getBucketTable().get(bucketKey) == null) {
       LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
       throw new OMException("Bucket not found",
-          ResultCodes.BUCKET_NOT_FOUND);
+          BUCKET_NOT_FOUND);
     }
   }
 
@@ -266,7 +277,7 @@ public class KeyManagerImpl implements KeyManager {
       LOG.error("Allocate block for a key not in open status in meta store" +
           " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
       throw new OMException("Open Key not found",
-          OMException.ResultCodes.KEY_NOT_FOUND);
+          KEY_NOT_FOUND);
     }
 
     OmKeyLocationInfo omKeyLocationInfo =
@@ -295,7 +306,7 @@ public class KeyManagerImpl implements KeyManager {
       LOG.error("Allocate block for a key not in open status in meta store" +
           " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
       throw new OMException("Open Key not found",
-          OMException.ResultCodes.KEY_NOT_FOUND);
+          KEY_NOT_FOUND);
     }
 
     // current version not committed, so new blocks coming now are added to
@@ -402,6 +413,9 @@ public class KeyManagerImpl implements KeyManager {
   @Override
   public OpenKeySession openKey(OmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
+    Preconditions.checkNotNull(args.getAcls(), "Default acls " +
+        "should be set.");
+
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
@@ -582,7 +596,7 @@ public class KeyManagerImpl implements KeyManager {
                                   ReplicationFactor factor,
                                   ReplicationType type, long size,
                                   FileEncryptionInfo encInfo) {
-    return new OmKeyInfo.Builder()
+    OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
@@ -593,8 +607,12 @@ public class KeyManagerImpl implements KeyManager {
         .setDataSize(size)
         .setReplicationType(type)
         .setReplicationFactor(factor)
-        .setFileEncryptionInfo(encInfo)
-        .build();
+        .setFileEncryptionInfo(encInfo);
+    if(keyArgs.getAcls() != null) {
+      builder.setAcls(keyArgs.getAcls().stream().map(a ->
+          OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
+    }
+    return builder.build();
   }
 
   @Override
@@ -615,7 +633,7 @@ public class KeyManagerImpl implements KeyManager {
       OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
       if (keyInfo == null) {
         throw new OMException("Commit a key without corresponding entry " +
-            objectKey, ResultCodes.KEY_NOT_FOUND);
+            objectKey, KEY_NOT_FOUND);
       }
       keyInfo.setDataSize(args.getDataSize());
       keyInfo.setModificationTime(Time.now());
@@ -655,7 +673,7 @@ public class KeyManagerImpl implements KeyManager {
         LOG.debug("volume:{} bucket:{} Key:{} not found",
             volumeName, bucketName, keyName);
         throw new OMException("Key not found",
-            OMException.ResultCodes.KEY_NOT_FOUND);
+            KEY_NOT_FOUND);
       }
       if (grpcBlockTokenEnabled) {
         String remoteUser = getRemoteUser().getShortUserName();
@@ -700,7 +718,7 @@ public class KeyManagerImpl implements KeyManager {
       LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
           volumeName, bucketName, keyName, ex);
       throw new OMException(ex.getMessage(),
-          OMException.ResultCodes.KEY_NOT_FOUND);
+          KEY_NOT_FOUND);
     } finally {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
@@ -733,7 +751,7 @@ public class KeyManagerImpl implements KeyManager {
                 + "Key: {} not found.", volumeName, bucketName, fromKeyName,
             toKeyName, fromKeyName);
         throw new OMException("Key not found",
-            OMException.ResultCodes.KEY_NOT_FOUND);
+            KEY_NOT_FOUND);
       }
 
       // A rename is a no-op if the target and source name is same.
@@ -790,7 +808,7 @@ public class KeyManagerImpl implements KeyManager {
       OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
       if (keyInfo == null) {
         throw new OMException("Key not found",
-            OMException.ResultCodes.KEY_NOT_FOUND);
+            KEY_NOT_FOUND);
       } else {
         // directly delete key with no blocks from db. This key need not be
         // moved to deleted table.
@@ -922,6 +940,8 @@ public class KeyManagerImpl implements KeyManager {
           .setReplicationFactor(keyArgs.getFactor())
           .setOmKeyLocationInfos(Collections.singletonList(
               new OmKeyLocationInfoGroup(0, locations)))
+          .setAcls(keyArgs.getAcls().stream().map(a ->
+              OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
           .build();
       DBStore store = metadataManager.getStore();
       try (BatchOperation batch = store.initBatchOperation()) {
@@ -1155,13 +1175,13 @@ public class KeyManagerImpl implements KeyManager {
             .setDataSize(size)
             .setOmKeyLocationInfos(
                 Collections.singletonList(keyLocationInfoGroup))
-            .build();
+            .setAcls(omKeyArgs.getAcls().stream().map(a ->
+                OzoneAcl.toProtobuf(a)).collect(Collectors.toList())).build();
       } else {
         // Already a version exists, so we should add it as a new version.
         // But now as versioning is not supported, just following the commit
-        // key approach.
-        // When versioning support comes, then we can uncomment below code
-        // keyInfo.addNewVersion(locations);
+        // key approach. When versioning support comes, then we can uncomment
+        // below code keyInfo.addNewVersion(locations);
         keyInfo.updateLocationInfoList(locations);
       }
       DBStore store = metadataManager.getStore();
@@ -1330,6 +1350,305 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
+  /**
+   * Add acl for Ozone object. Return true if acl is added successfully else
+   * false.
+   *
+   * @param obj Ozone object for which acl should be added.
+   * @param acl ozone acl top be added.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean addAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    validateOzoneObj(obj);
+    String volume = obj.getVolumeName();
+    String bucket = obj.getBucketName();
+    String keyName = obj.getKeyName();
+
+    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    try {
+      validateBucket(volume, bucket);
+      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
+      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
+      Table keyTable;
+      if (keyInfo == null) {
+        keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
+        if (keyInfo == null) {
+          throw new OMException("Key not found. Key:" +
+              objectKey, KEY_NOT_FOUND);
+        }
+        keyTable = metadataManager.getOpenKeyTable();
+      } else {
+        keyTable = metadataManager.getKeyTable();
+      }
+      List<OzoneAclInfo> newAcls = new ArrayList<>(keyInfo.getAcls());
+      OzoneAclInfo newAcl = null;
+      for(OzoneAclInfo a: keyInfo.getAcls()) {
+        if(a.getName().equals(acl.getName())) {
+          List<OzoneAclRights> rights =
+              new ArrayList<>(a.getRightsList());
+          for (IAccessAuthorizer.ACLType aclType : acl.getAclList()) {
+            rights.add(OzoneAclRights.valueOf(aclType.name()));
+          }
+          newAcl = OzoneAclInfo.newBuilder()
+              .setType(a.getType())
+              .setName(a.getName())
+              .addAllRights(rights)
+              .build();
+          newAcls.remove(a);
+          newAcls.add(newAcl);
+          break;
+        }
+      }
+      if(newAcl == null) {
+        newAcls.add(OzoneAcl.toProtobuf(acl));
+      }
+
+      OmKeyInfo newObj = new OmKeyInfo.Builder()
+          .setBucketName(keyInfo.getBucketName())
+          .setKeyName(keyInfo.getKeyName())
+          .setReplicationFactor(keyInfo.getFactor())
+          .setReplicationType(keyInfo.getType())
+          .setVolumeName(keyInfo.getVolumeName())
+          .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions())
+          .setCreationTime(keyInfo.getCreationTime())
+          .setModificationTime(keyInfo.getModificationTime())
+          .setAcls(newAcls)
+          .setDataSize(keyInfo.getDataSize())
+          .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo())
+          .build();
+      keyTable.put(objectKey, newObj);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Add acl operation failed for key:{}/{}/{}", volume,
+            bucket, keyName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volume, bucket);
+    }
+    return true;
+  }
+
+  /**
+   * Remove acl for Ozone object. Return true if acl is removed successfully
+   * else false.
+   *
+   * @param obj Ozone object.
+   * @param acl Ozone acl to be removed.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean removeAcl(OzoneObj obj, OzoneAcl acl) throws IOException {
+    validateOzoneObj(obj);
+    String volume = obj.getVolumeName();
+    String bucket = obj.getBucketName();
+    String keyName = obj.getKeyName();
+
+    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    try {
+      validateBucket(volume, bucket);
+      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
+      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
+      Table keyTable;
+      if (keyInfo == null) {
+        keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
+        if (keyInfo == null) {
+          throw new OMException("Key not found. Key:" +
+              objectKey, KEY_NOT_FOUND);
+        }
+        keyTable = metadataManager.getOpenKeyTable();
+      } else {
+        keyTable = metadataManager.getKeyTable();
+      }
+
+      List<OzoneAclInfo> newAcls = new ArrayList<>(keyInfo.getAcls());
+      OzoneAclInfo newAcl = OzoneAcl.toProtobuf(acl);
+
+      if(newAcls.contains(OzoneAcl.toProtobuf(acl))) {
+        newAcls.remove(newAcl);
+      } else {
+        // Acl to be removed might be a subset of existing acls.
+        for(OzoneAclInfo a: keyInfo.getAcls()) {
+          if(a.getName().equals(acl.getName())) {
+            List<OzoneAclRights> rights =
+                new ArrayList<>(a.getRightsList());
+            for (IAccessAuthorizer.ACLType aclType : acl.getAclList()) {
+              rights.remove(OzoneAclRights.valueOf(aclType.name()));
+            }
+            newAcl = OzoneAclInfo.newBuilder()
+                .setType(a.getType())
+                .setName(a.getName())
+                .addAllRights(rights)
+                .build();
+            newAcls.remove(a);
+            newAcls.add(newAcl);
+            break;
+          }
+        }
+        if(newAcl == null) {
+          newAcls.add(OzoneAcl.toProtobuf(acl));
+        }
+      }
+
+      OmKeyInfo newObj = new OmKeyInfo.Builder()
+          .setBucketName(keyInfo.getBucketName())
+          .setKeyName(keyInfo.getKeyName())
+          .setReplicationFactor(keyInfo.getFactor())
+          .setReplicationType(keyInfo.getType())
+          .setVolumeName(keyInfo.getVolumeName())
+          .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions())
+          .setCreationTime(keyInfo.getCreationTime())
+          .setModificationTime(keyInfo.getModificationTime())
+          .setAcls(newAcls)
+          .setDataSize(keyInfo.getDataSize())
+          .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo())
+          .build();
+
+      keyTable.put(objectKey, newObj);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Remove acl operation failed for key:{}/{}/{}", volume,
+            bucket, keyName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volume, bucket);
+    }
+    return true;
+  }
+
+  /**
+   * Acls to be set for given Ozone object. This operations reset ACL for given
+   * object to list of ACLs provided in argument.
+   *
+   * @param obj Ozone object.
+   * @param acls List of acls.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public boolean setAcl(OzoneObj obj, List<OzoneAcl> acls) throws IOException {
+    validateOzoneObj(obj);
+    String volume = obj.getVolumeName();
+    String bucket = obj.getBucketName();
+    String keyName = obj.getKeyName();
+
+    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    try {
+      validateBucket(volume, bucket);
+      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
+      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
+      Table keyTable;
+      if (keyInfo == null) {
+        keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
+        if (keyInfo == null) {
+          throw new OMException("Key not found. Key:" +
+              objectKey, KEY_NOT_FOUND);
+        }
+        keyTable = metadataManager.getOpenKeyTable();
+      } else {
+        keyTable = metadataManager.getKeyTable();
+      }
+
+      List<OzoneAclInfo> newAcls = new ArrayList<>();
+      for (OzoneAcl a : acls) {
+        newAcls.add(OzoneAcl.toProtobuf(a));
+      }
+      OmKeyInfo newObj = new OmKeyInfo.Builder()
+          .setBucketName(keyInfo.getBucketName())
+          .setKeyName(keyInfo.getKeyName())
+          .setReplicationFactor(keyInfo.getFactor())
+          .setReplicationType(keyInfo.getType())
+          .setVolumeName(keyInfo.getVolumeName())
+          .setOmKeyLocationInfos(keyInfo.getKeyLocationVersions())
+          .setCreationTime(keyInfo.getCreationTime())
+          .setModificationTime(keyInfo.getModificationTime())
+          .setAcls(newAcls)
+          .setDataSize(keyInfo.getDataSize())
+          .setFileEncryptionInfo(keyInfo.getFileEncryptionInfo())
+          .build();
+
+      keyTable.put(objectKey, newObj);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Set acl operation failed for key:{}/{}/{}", volume,
+            bucket, keyName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volume, bucket);
+    }
+    return true;
+  }
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    validateOzoneObj(obj);
+    String volume = obj.getVolumeName();
+    String bucket = obj.getBucketName();
+    String keyName = obj.getKeyName();
+
+    metadataManager.getLock().acquireBucketLock(volume, bucket);
+    try {
+      validateBucket(volume, bucket);
+      String objectKey = metadataManager.getOzoneKey(volume, bucket, keyName);
+      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
+      if (keyInfo == null) {
+        keyInfo = metadataManager.getOpenKeyTable().get(objectKey);
+        if (keyInfo == null) {
+          throw new OMException("Key not found. Key:" +
+              objectKey, KEY_NOT_FOUND);
+        }
+      }
+
+      List<OzoneAcl> acls = new ArrayList<>();
+      for (OzoneAclInfo a : keyInfo.getAcls()) {
+        acls.add(OzoneAcl.fromProtobuf(a));
+      }
+      return acls;
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Get acl operation failed for key:{}/{}/{}", volume,
+            bucket, keyName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volume, bucket);
+    }
+  }
+
+  /**
+   * Helper method to validate ozone object.
+   * @param obj
+   * */
+  private void validateOzoneObj(OzoneObj obj) throws OMException {
+    Objects.requireNonNull(obj);
+
+    if (!obj.getResourceType().equals(KEY)) {
+      throw new IllegalArgumentException("Unexpected argument passed to " +
+          "KeyManager. OzoneObj type:" + obj.getResourceType());
+    }
+    String volume = obj.getVolumeName();
+    String bucket = obj.getBucketName();
+    String keyName = obj.getKeyName();
+
+    if (Strings.isNullOrEmpty(volume)) {
+      throw new OMException("Volume name is required.", VOLUME_NOT_FOUND);
+    }
+    if (Strings.isNullOrEmpty(bucket)) {
+      throw new OMException("Bucket name is required.", BUCKET_NOT_FOUND);
+    }
+    if (Strings.isNullOrEmpty(keyName)) {
+      throw new OMException("Key name is required.", KEY_NOT_FOUND);
+    }
+  }
+
   /**
    * OzoneFS api to get file status for an entry.
    *
@@ -1420,7 +1739,7 @@ public class KeyManagerImpl implements KeyManager {
         return;
       }
       OmKeyInfo dirDbKeyInfo =
-          createDirectoryKey(volumeName, bucketName, keyName);
+          createDirectoryKey(volumeName, bucketName, keyName, args.getAcls());
       String dirDbKey = metadataManager
           .getOzoneKey(volumeName, bucketName, dirDbKeyInfo.getKeyName());
       metadataManager.getKeyTable().put(dirDbKey, dirDbKeyInfo);
@@ -1430,7 +1749,7 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   private OmKeyInfo createDirectoryKey(String volumeName, String bucketName,
-      String keyName) throws IOException {
+      String keyName, List<OzoneAcl> acls) throws IOException {
     // verify bucket exists
     OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
 
@@ -1448,6 +1767,8 @@ public class KeyManagerImpl implements KeyManager {
         .setReplicationType(ReplicationType.RATIS)
         .setReplicationFactor(ReplicationFactor.ONE)
         .setFileEncryptionInfo(encInfo)
+        .setAcls(acls.stream().map(a ->
+            OzoneAcl.toProtobuf(a)).collect(Collectors.toList()))
         .build();
   }
 

+ 8 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -2971,9 +2971,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     switch (obj.getResourceType()) {
     case VOLUME:
       return volumeManager.addAcl(obj, acl);
-
     case BUCKET:
       return bucketManager.addAcl(obj, acl);
+    case KEY:
+      return keyManager.addAcl(obj, acl);
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
@@ -3001,6 +3002,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     case BUCKET:
       return bucketManager.removeAcl(obj, acl);
+    case KEY:
+      return keyManager.removeAcl(obj, acl);
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
@@ -3025,9 +3028,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     switch (obj.getResourceType()) {
     case VOLUME:
       return volumeManager.setAcl(obj, acls);
-
     case BUCKET:
       return bucketManager.setAcl(obj, acls);
+    case KEY:
+      return keyManager.setAcl(obj, acls);
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);
@@ -3050,9 +3054,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     switch (obj.getResourceType()) {
     case VOLUME:
       return volumeManager.getAcl(obj);
-
     case BUCKET:
       return bucketManager.getAcl(obj);
+    case KEY:
+      return keyManager.getAcl(obj);
     default:
       throw new OMException("Unexpected resource type: " +
           obj.getResourceType(), INVALID_REQUEST);

+ 2 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java

@@ -542,7 +542,7 @@ public class VolumeManagerImpl implements VolumeManager {
       try {
         volumeArgs.addAcl(acl);
       } catch (OMException ex) {
-        LOG.info("Add acl failed.", ex);
+        LOG.debug("Add acl failed.", ex);
         return false;
       }
       metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);
@@ -592,7 +592,7 @@ public class VolumeManagerImpl implements VolumeManager {
       try {
         volumeArgs.removeAcl(acl);
       } catch (OMException ex) {
-        LOG.info("Remove acl failed.", ex);
+        LOG.debug("Remove acl failed.", ex);
         return false;
       }
       metadataManager.getVolumeTable().put(dbVolumeKey, volumeArgs);

+ 2 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java

@@ -170,6 +170,8 @@ public final class OzoneManagerRatisServer {
       omResponse.setMessage(stateMachineException.getCause().getMessage());
       omResponse.setStatus(parseErrorStatus(
           stateMachineException.getCause().getMessage()));
+      LOG.debug("Error while executing ratis request. " +
+          "stateMachineException: ", stateMachineException);
       return omResponse.build();
     }
 

+ 12 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

@@ -576,6 +576,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setIsMultipartKey(keyArgs.getIsMultipartKey())
         .setMultipartUploadID(keyArgs.getMultipartUploadID())
         .setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
+        .setAcls(keyArgs.getAclsList().stream().map(a ->
+            OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
         .build();
     if (keyArgs.hasDataSize()) {
       omKeyArgs.setDataSize(keyArgs.getDataSize());
@@ -825,6 +827,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
         .setType(keyArgs.getType())
+        .setAcls(keyArgs.getAclsList().stream().map(a ->
+            OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
         .setFactor(keyArgs.getFactor())
         .build();
     OmMultipartInfo multipartInfo = impl.initiateMultipartUpload(omKeyArgs);
@@ -847,6 +851,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
         .setType(keyArgs.getType())
+        .setAcls(keyArgs.getAclsList().stream().map(a ->
+            OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
         .setFactor(keyArgs.getFactor())
         .build();
     OmMultipartInfo multipartInfo =
@@ -905,6 +911,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
+        .setAcls(keyArgs.getAclsList().stream().map(a ->
+            OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
         .setMultipartUploadID(keyArgs.getMultipartUploadID())
         .build();
     OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = impl
@@ -1050,6 +1058,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
+        .setAcls(keyArgs.getAclsList().stream().map(a ->
+            OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
         .build();
     impl.createDirectory(omKeyArgs);
   }
@@ -1064,6 +1074,8 @@ public class OzoneManagerRequestHandler implements RequestHandler {
         .setDataSize(keyArgs.getDataSize())
         .setType(keyArgs.getType())
         .setFactor(keyArgs.getFactor())
+        .setAcls(keyArgs.getAclsList().stream().map(a ->
+            OzoneAcl.fromProtobuf(a)).collect(Collectors.toList()))
         .build();
     OpenKeySession keySession =
         impl.createFile(omKeyArgs, request.getIsOverwrite(),

+ 2 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone.om;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -192,6 +193,7 @@ public class TestKeyDeletingService {
               .setVolumeName(volumeName)
               .setBucketName(bucketName)
               .setKeyName(keyName)
+              .setAcls(Collections.emptyList())
               .setLocationInfoList(new ArrayList<>())
               .build();
       //Open, Commit and Delete the Keys in the Key Manager.

+ 11 - 2
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

@@ -49,6 +49,8 @@ import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.*;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 
@@ -60,6 +62,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 
 /**
  * Test class for @{@link KeyManagerImpl}.
@@ -173,11 +176,14 @@ public class TestKeyManagerImpl {
 
   @Test
   public void openKeyFailureInSafeMode() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
         metadataManager, conf, "om1", null);
     OmKeyArgs keyArgs = createBuilder()
         .setKeyName(KEY_NAME)
         .setDataSize(1000)
+        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+            ALL, ALL))
         .build();
     LambdaTestUtils.intercept(OMException.class,
         "SafeModePrecheck failed for allocateBlock", () -> {
@@ -355,7 +361,7 @@ public class TestKeyManagerImpl {
     }
   }
 
-  private OmKeyArgs createKeyArgs(String toKeyName) {
+  private OmKeyArgs createKeyArgs(String toKeyName) throws IOException {
     return createBuilder().setKeyName(toKeyName).build();
   }
 
@@ -542,12 +548,15 @@ public class TestKeyManagerImpl {
     return keyNames;
   }
 
-  private OmKeyArgs.Builder createBuilder() {
+  private OmKeyArgs.Builder createBuilder() throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     return new OmKeyArgs.Builder()
         .setBucketName(BUCKET_NAME)
         .setFactor(ReplicationFactor.ONE)
         .setDataSize(0)
         .setType(ReplicationType.STAND_ALONE)
+        .setAcls(OzoneUtils.getAclList(ugi.getUserName(), ugi.getGroups(),
+            ALL, ALL))
         .setVolumeName(VOLUME_NAME);
   }
 }