Browse Source

HDDS-1975. Implement default acls for bucket/volume/key for OM HA code. (#1315)

Bharat Viswanadham 5 years ago
parent
commit
d1aa8596e0
17 changed files with 192 additions and 142 deletions
  1. 0 9
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
  2. 3 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
  3. 36 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
  4. 1 7
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
  5. 4 10
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
  6. 1 7
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
  7. 1 7
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
  8. 6 10
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
  9. 1 7
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
  10. 1 7
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
  11. 96 11
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
  12. 37 20
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/bucket/S3BucketCreateRequest.java
  13. 1 9
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/bucket/S3BucketDeleteRequest.java
  14. 1 9
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
  15. 1 9
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
  16. 1 9
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
  17. 1 9
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java

+ 0 - 9
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -119,7 +119,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
 
 import org.junit.Ignore;
 import org.junit.Test;
@@ -2221,8 +2220,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
   @Test
   public void testNativeAclsForVolume() throws Exception {
-    assumeFalse("Remove this once ACL HA is supported",
-        getClass().equals(TestOzoneRpcClientWithRatis.class));
     String volumeName = UUID.randomUUID().toString();
     store.createVolume(volumeName);
 
@@ -2237,8 +2234,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
   @Test
   public void testNativeAclsForBucket() throws Exception {
-    assumeFalse("Remove this once ACL HA is supported",
-        getClass().equals(TestOzoneRpcClientWithRatis.class));
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
 
@@ -2299,8 +2294,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
   @Test
   public void testNativeAclsForKey() throws Exception {
-    assumeFalse("Remove this once ACL HA is supported",
-        getClass().equals(TestOzoneRpcClientWithRatis.class));
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
     String key1 = "dir1/dir2" + UUID.randomUUID().toString();
@@ -2363,8 +2356,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
   @Test
   public void testNativeAclsForPrefix() throws Exception {
-    assumeFalse("Remove this once ACL HA is supported",
-        getClass().equals(TestOzoneRpcClientWithRatis.class));
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
 

+ 3 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java

@@ -26,6 +26,7 @@ import java.util.Map;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditAction;
@@ -142,7 +143,8 @@ public abstract class OMClientRequest implements RequestAuditor {
    */
   @VisibleForTesting
   public UserGroupInformation createUGI() {
-    if (omRequest.hasUserInfo()) {
+    if (omRequest.hasUserInfo() &&
+        !StringUtils.isBlank(omRequest.getUserInfo().getUserName())) {
       return UserGroupInformation.createRemoteUser(
           omRequest.getUserInfo().getUserName());
     } else {

+ 36 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java

@@ -19,8 +19,14 @@
 package org.apache.hadoop.ozone.om.request.bucket;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -146,8 +152,11 @@ public class OMBucketCreateRequest extends OMClientRequest {
           volumeName);
       acquiredBucketLock = metadataManager.getLock().acquireLock(BUCKET_LOCK,
           volumeName, bucketName);
+
+      OmVolumeArgs omVolumeArgs =
+          metadataManager.getVolumeTable().get(volumeKey);
       //Check if the volume exists
-      if (metadataManager.getVolumeTable().get(volumeKey) == null) {
+      if (omVolumeArgs == null) {
         LOG.debug("volume: {} not found ", volumeName);
         throw new OMException("Volume doesn't exist",
             OMException.ResultCodes.VOLUME_NOT_FOUND);
@@ -160,6 +169,9 @@ public class OMBucketCreateRequest extends OMClientRequest {
             OMException.ResultCodes.BUCKET_ALREADY_EXISTS);
       }
 
+      // Add default acls from volume.
+      addDefaultAcls(omBucketInfo, omVolumeArgs);
+
       // Update table cache.
       metadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey),
           new CacheValue<>(Optional.of(omBucketInfo), transactionLogIndex));
@@ -205,6 +217,29 @@ public class OMBucketCreateRequest extends OMClientRequest {
   }
 
 
+  /**
+   * Add default acls for bucket. These acls are inherited from volume
+   * default acl list.
+   * @param omBucketInfo
+   * @param omVolumeArgs
+   */
+  private void addDefaultAcls(OmBucketInfo omBucketInfo,
+      OmVolumeArgs omVolumeArgs) {
+    // Add default acls from volume.
+    List<OzoneAcl> acls = new ArrayList<>();
+    if (omBucketInfo.getAcls() != null) {
+      acls.addAll(omBucketInfo.getAcls());
+    }
+
+    List<OzoneAcl> defaultVolumeAclList = omVolumeArgs.getAclMap()
+        .getDefaultAclList().stream().map(OzoneAcl::fromProtobuf)
+        .collect(Collectors.toList());
+
+    OzoneAclUtil.inheritDefaultAcls(acls, defaultVolumeAclList);
+    omBucketInfo.setAcls(acls);
+  }
+
+
   private BucketInfo getBucketInfoFromRequest() {
     CreateBucketRequest createBucketRequest =
         getOmRequest().getCreateBucketRequest();

+ 1 - 7
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java

@@ -57,8 +57,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -129,11 +127,7 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
+      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
 
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {

+ 4 - 10
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java

@@ -53,8 +53,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.UniqueId;
 import org.apache.hadoop.utils.db.Table;
@@ -179,11 +177,7 @@ public class OMFileCreateRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
+      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
 
       // acquire lock
       acquiredLock = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
@@ -265,20 +259,20 @@ public class OMFileCreateRequest extends OMKeyRequest {
       omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs,
           omMetadataManager.getOzoneKey(volumeName, bucketName,
               keyName), keyArgs.getDataSize(), locations,
-          encryptionInfo.orNull());
+          encryptionInfo.orNull(), ozoneManager.getPrefixManager(), bucketInfo);
 
       omClientResponse =  prepareCreateKeyResponse(keyArgs, omKeyInfo,
           locations, encryptionInfo.orNull(), exception,
           createFileRequest.getClientID(), transactionLogIndex, volumeName,
           bucketName, keyName, ozoneManager,
-          OMAction.CREATE_FILE);
+          OMAction.CREATE_FILE, ozoneManager.getPrefixManager(), bucketInfo);
     } catch (IOException ex) {
       exception = ex;
       omClientResponse =  prepareCreateKeyResponse(keyArgs, omKeyInfo,
           locations, encryptionInfo.orNull(), exception,
           createFileRequest.getClientID(), transactionLogIndex,
           volumeName, bucketName, keyName, ozoneManager,
-          OMAction.CREATE_FILE);
+          OMAction.CREATE_FILE, ozoneManager.getPrefixManager(), null);
     } finally {
       if (omClientResponse != null) {
         omClientResponse.setFlushFuture(

+ 1 - 7
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java

@@ -53,8 +53,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
@@ -171,11 +169,7 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     OmKeyInfo omKeyInfo = null;
     try {
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
+      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
 
       OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
       validateBucketAndVolume(omMetadataManager, volumeName,

+ 1 - 7
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java

@@ -48,8 +48,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -117,11 +115,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     try {
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
+      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
 
       List<OmKeyLocationInfo> locationInfoList = commitKeyArgs
           .getKeyLocationsList().stream()

+ 6 - 10
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java

@@ -47,8 +47,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.UniqueId;
 
@@ -164,11 +162,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
+      checkBucketAcls(ozoneManager, volumeName, bucketName, keyName);
 
       acquireLock = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
           volumeName, bucketName);
@@ -184,17 +178,19 @@ public class OMKeyCreateRequest extends OMKeyRequest {
 
       omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs,
           omMetadataManager.getOzoneKey(volumeName, bucketName, keyName),
-          keyArgs.getDataSize(), locations, encryptionInfo.orNull());
+          keyArgs.getDataSize(), locations, encryptionInfo.orNull(),
+          ozoneManager.getPrefixManager(), bucketInfo);
       omClientResponse = prepareCreateKeyResponse(keyArgs, omKeyInfo,
           locations, encryptionInfo.orNull(), exception,
           createKeyRequest.getClientID(), transactionLogIndex, volumeName,
-          bucketName, keyName, ozoneManager, OMAction.ALLOCATE_KEY);
+          bucketName, keyName, ozoneManager, OMAction.ALLOCATE_KEY,
+          ozoneManager.getPrefixManager(), bucketInfo);
     } catch (IOException ex) {
       exception = ex;
       omClientResponse = prepareCreateKeyResponse(keyArgs, omKeyInfo, locations,
           encryptionInfo.orNull(), exception, createKeyRequest.getClientID(),
           transactionLogIndex, volumeName, bucketName, keyName, ozoneManager,
-          OMAction.ALLOCATE_KEY);
+          OMAction.ALLOCATE_KEY, ozoneManager.getPrefixManager(), null);
     } finally {
       if (omClientResponse != null) {
         omClientResponse.setFlushFuture(

+ 1 - 7
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java

@@ -43,8 +43,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .DeleteKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -111,11 +109,7 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     try {
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE,
-            volumeName, bucketName, keyName);
-      }
+      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName);
 
       String objectKey = omMetadataManager.getOzoneKey(
           volumeName, bucketName, keyName);

+ 1 - 7
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java

@@ -44,8 +44,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .RenameKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .RenameKeyResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.Table;
 import org.apache.hadoop.utils.db.cache.CacheKey;
@@ -120,11 +118,7 @@ public class OMKeyRenameRequest extends OMKeyRequest {
             OMException.ResultCodes.INVALID_KEY_NAME);
       }
       // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, fromKeyName);
-      }
+      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName);
 
       acquiredLock = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
           volumeName, bucketName);

+ 96 - 11
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java

@@ -32,12 +32,17 @@ import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.PrefixManager;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +83,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
     .BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes
@@ -248,7 +254,9 @@ public abstract class OMKeyRequest extends OMClientRequest {
       FileEncryptionInfo encryptionInfo, @Nullable IOException exception,
       long clientID, long transactionLogIndex, @Nonnull String volumeName,
       @Nonnull String bucketName, @Nonnull String keyName,
-      @Nonnull OzoneManager ozoneManager, @Nonnull OMAction omAction) {
+      @Nonnull OzoneManager ozoneManager, @Nonnull OMAction omAction,
+      @Nonnull PrefixManager prefixManager,
+      @Nullable OmBucketInfo omBucketInfo) {
 
     OMResponse.Builder omResponse = OMResponse.newBuilder()
         .setStatus(OzoneManagerProtocolProtos.Status.OK);
@@ -263,7 +271,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
         // version 0
         omKeyInfo = createKeyInfo(keyArgs, locations, keyArgs.getFactor(),
             keyArgs.getType(), keyArgs.getDataSize(),
-            encryptionInfo);
+            encryptionInfo, prefixManager, omBucketInfo);
       }
 
       long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
@@ -335,12 +343,15 @@ public abstract class OMKeyRequest extends OMClientRequest {
    * Create OmKeyInfo object.
    * @return OmKeyInfo
    */
+  @SuppressWarnings("parameterNumber")
   protected OmKeyInfo createKeyInfo(@Nonnull KeyArgs keyArgs,
       @Nonnull List<OmKeyLocationInfo> locations,
       @Nonnull HddsProtos.ReplicationFactor factor,
       @Nonnull HddsProtos.ReplicationType type, long size,
-      @Nullable FileEncryptionInfo encInfo) {
-    OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
+      @Nullable FileEncryptionInfo encInfo,
+      @Nonnull PrefixManager prefixManager,
+      @Nullable OmBucketInfo omBucketInfo) {
+    return new OmKeyInfo.Builder()
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
         .setKeyName(keyArgs.getKeyName())
@@ -351,11 +362,46 @@ public abstract class OMKeyRequest extends OMClientRequest {
         .setDataSize(size)
         .setReplicationType(type)
         .setReplicationFactor(factor)
-        .setFileEncryptionInfo(encInfo);
+        .setFileEncryptionInfo(encInfo)
+        .setAcls(getAclsForKey(keyArgs, omBucketInfo, prefixManager)).build();
+  }
+
+  private List< OzoneAcl > getAclsForKey(KeyArgs keyArgs,
+      OmBucketInfo bucketInfo, PrefixManager prefixManager) {
+    List<OzoneAcl> acls = new ArrayList<>();
+
     if(keyArgs.getAclsList() != null) {
-      builder.setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
+      acls.addAll(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()));
     }
-    return builder.build();
+
+    // Inherit DEFAULT acls from prefix.
+    if(prefixManager != null) {
+      List< OmPrefixInfo > prefixList = prefixManager.getLongestPrefixPath(
+          OZONE_URI_DELIMITER +
+              keyArgs.getVolumeName() + OZONE_URI_DELIMITER +
+              keyArgs.getBucketName() + OZONE_URI_DELIMITER +
+              keyArgs.getKeyName());
+
+      if(prefixList.size() > 0) {
+        // Add all acls from direct parent to key.
+        OmPrefixInfo prefixInfo = prefixList.get(prefixList.size() - 1);
+        if(prefixInfo  != null) {
+          if (OzoneAclUtil.inheritDefaultAcls(acls, prefixInfo.getAcls())) {
+            return acls;
+          }
+        }
+      }
+    }
+
+    // Inherit DEFAULT acls from bucket only if DEFAULT acls for
+    // prefix are not set.
+    if (bucketInfo != null) {
+      if (OzoneAclUtil.inheritDefaultAcls(acls, bucketInfo.getAcls())) {
+        return acls;
+      }
+    }
+
+    return acls;
   }
 
   /**
@@ -363,16 +409,18 @@ public abstract class OMKeyRequest extends OMClientRequest {
    * @return OmKeyInfo
    * @throws IOException
    */
+  @SuppressWarnings("parameternumber")
   protected OmKeyInfo prepareKeyInfo(
       @Nonnull OMMetadataManager omMetadataManager,
       @Nonnull KeyArgs keyArgs, @Nonnull String dbKeyName, long size,
       @Nonnull List<OmKeyLocationInfo> locations,
-      @Nullable FileEncryptionInfo encInfo)
+      @Nullable FileEncryptionInfo encInfo,
+      @Nonnull PrefixManager prefixManager, @Nullable OmBucketInfo omBucketInfo)
       throws IOException {
     OmKeyInfo keyInfo = null;
     if (keyArgs.getIsMultipartKey()) {
       keyInfo = prepareMultipartKeyInfo(omMetadataManager, keyArgs, size,
-          locations, encInfo);
+          locations, encInfo, prefixManager, omBucketInfo);
       //TODO args.getMetadata
     } else if (omMetadataManager.getKeyTable().isExist(dbKeyName)) {
       // TODO: Need to be fixed, as when key already exists, we are
@@ -400,7 +448,8 @@ public abstract class OMKeyRequest extends OMClientRequest {
       @Nonnull OMMetadataManager omMetadataManager,
       @Nonnull KeyArgs args, long size,
       @Nonnull List<OmKeyLocationInfo> locations,
-      FileEncryptionInfo encInfo) throws IOException {
+      FileEncryptionInfo encInfo,  @Nonnull PrefixManager prefixManager,
+      @Nullable OmBucketInfo omBucketInfo) throws IOException {
     HddsProtos.ReplicationFactor factor;
     HddsProtos.ReplicationType type;
 
@@ -427,7 +476,8 @@ public abstract class OMKeyRequest extends OMClientRequest {
     }
     // For this upload part we don't need to check in KeyTable. As this
     // is not an actual key, it is a part of the key.
-    return createKeyInfo(args, locations, factor, type, size, encInfo);
+    return createKeyInfo(args, locations, factor, type, size, encInfo,
+        prefixManager, omBucketInfo);
   }
 
 
@@ -447,4 +497,39 @@ public abstract class OMKeyRequest extends OMClientRequest {
     }
   }
 
+  /**
+   * Check Acls for the ozone bucket.
+   * @param ozoneManager
+   * @param volume
+   * @param bucket
+   * @param key
+   * @throws IOException
+   */
+  protected void checkBucketAcls(OzoneManager ozoneManager, String volume,
+      String bucket, String key) throws IOException {
+    if (ozoneManager.getAclsEnabled()) {
+      checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
+          OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+          volume, bucket, key);
+    }
+  }
+
+
+  /**
+   * Check Acls for the ozone key.
+   * @param ozoneManager
+   * @param volume
+   * @param bucket
+   * @param key
+   * @throws IOException
+   */
+  protected void checkKeyAcls(OzoneManager ozoneManager, String volume,
+      String bucket, String key) throws IOException {
+    if (ozoneManager.getAclsEnabled()) {
+      checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
+          OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+          volume, bucket, key);
+    }
+  }
+
 }

+ 37 - 20
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/bucket/S3BucketCreateRequest.java

@@ -20,12 +20,15 @@ package org.apache.hadoop.ozone.om.request.s3.bucket;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,8 +59,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .S3CreateVolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .VolumeList;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -151,13 +152,8 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
     String volumeName = formatOzoneVolumeName(userName);
     OMClientResponse omClientResponse = null;
     try {
-      // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
-            OzoneObj.StoreType.S3, IAccessAuthorizer.ACLType.CREATE, null,
-            s3BucketName, null);
-      }
 
+      // TODO to support S3 ACL later.
       acquiredS3Lock = omMetadataManager.getLock().acquireLock(S3_BUCKET_LOCK,
           s3BucketName);
 
@@ -202,7 +198,7 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
       // check if ozone bucket exists, if it does not exist create ozone
       // bucket
       OmBucketInfo omBucketInfo = createBucket(omMetadataManager, volumeName,
-          s3BucketName,
+          s3BucketName, userName,
           s3CreateBucketRequest.getS3CreateVolumeInfo().getCreationTime(),
           transactionLogIndex);
 
@@ -262,8 +258,8 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
 
 
   private OmBucketInfo createBucket(OMMetadataManager omMetadataManager,
-      String volumeName, String s3BucketName, long creationTime,
-      long transactionLogIndex) throws IOException {
+      String volumeName, String s3BucketName, String userName,
+      long creationTime, long transactionLogIndex) throws IOException {
     // check if ozone bucket exists, if it does not exist create ozone
     // bucket
     boolean acquireBucketLock = false;
@@ -275,7 +271,7 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
       String bucketKey = omMetadataManager.getBucketKey(volumeName,
           s3BucketName);
       if (!omMetadataManager.getBucketTable().isExist(bucketKey)) {
-        omBucketInfo = createOmBucketInfo(volumeName, s3BucketName,
+        omBucketInfo = createOmBucketInfo(volumeName, s3BucketName, userName,
             creationTime);
         // Add to bucket table cache.
         omMetadataManager.getBucketTable().addCacheEntry(
@@ -329,12 +325,19 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
    * @return {@link OmVolumeArgs}
    */
   private OmVolumeArgs createOmVolumeArgs(String volumeName, String userName,
-      long creationTime) {
-    return OmVolumeArgs.newBuilder()
+      long creationTime) throws IOException {
+    OmVolumeArgs.Builder builder = OmVolumeArgs.newBuilder()
         .setAdminName(S3_ADMIN_NAME).setVolume(volumeName)
         .setQuotaInBytes(OzoneConsts.MAX_QUOTA_IN_BYTES)
         .setOwnerName(userName)
-        .setCreationTime(creationTime).build();
+        .setCreationTime(creationTime);
+
+    // Set default acls.
+    for (OzoneAcl acl : getDefaultAcls(userName)) {
+      builder.addOzoneAcls(OzoneAcl.toProtobuf(acl));
+    }
+
+    return builder.build();
   }
 
   /**
@@ -346,13 +349,18 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
    * @return {@link OmBucketInfo}
    */
   private OmBucketInfo createOmBucketInfo(String volumeName,
-      String s3BucketName, long creationTime) {
+      String s3BucketName, String userName, long creationTime) {
     //TODO: Now S3Bucket API takes only bucketName as param. In future if we
     // support some configurable options we need to fix this.
-    return OmBucketInfo.newBuilder().setVolumeName(volumeName)
-        .setBucketName(s3BucketName).setIsVersionEnabled(Boolean.FALSE)
-        .setStorageType(StorageType.DEFAULT).setCreationTime(creationTime)
-        .build();
+    OmBucketInfo.Builder builder =
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(s3BucketName).setIsVersionEnabled(Boolean.FALSE)
+            .setStorageType(StorageType.DEFAULT).setCreationTime(creationTime);
+
+    // Set default acls.
+    builder.setAcls(getDefaultAcls(userName));
+
+    return builder.build();
   }
 
   /**
@@ -368,5 +376,14 @@ public class S3BucketCreateRequest extends OMVolumeRequest {
     auditMap.put(s3BucketName, OzoneConsts.S3_BUCKET);
     return auditMap;
   }
+
+  /**
+   * Get default acls.
+   * */
+  private List<OzoneAcl> getDefaultAcls(String userName) {
+    UserGroupInformation ugi = createUGI();
+    return OzoneAcl.parseAcls("user:" + (ugi == null ? userName :
+        ugi.getUserName()) + ":a,user:" + S3_ADMIN_NAME + ":a");
+  }
 }
 

+ 1 - 9
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/bucket/S3BucketDeleteRequest.java

@@ -43,8 +43,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .S3DeleteBucketRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
@@ -107,13 +105,7 @@ public class S3BucketDeleteRequest extends OMVolumeRequest {
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     OMClientResponse omClientResponse = null;
     try {
-      // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
-            OzoneObj.StoreType.S3, IAccessAuthorizer.ACLType.DELETE, null,
-            s3BucketName, null);
-      }
-
+      // TODO to support S3 ACL later.
       acquiredS3Lock = omMetadataManager.getLock().acquireLock(S3_BUCKET_LOCK,
           s3BucketName);
 

+ 1 - 9
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java

@@ -36,8 +36,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Multipa
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.UniqueId;
 import org.apache.hadoop.utils.db.cache.CacheKey;
@@ -114,13 +112,7 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
         .setSuccess(true);
     OMClientResponse omClientResponse = null;
     try {
-      // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
-
+      // TODO to support S3 ACL later.
       acquiredBucketLock =
           omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
               bucketName);

+ 1 - 9
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java

@@ -44,8 +44,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -98,13 +96,7 @@ public class S3MultipartUploadAbortRequest extends OMKeyRequest {
         .setSuccess(true);
     OMClientResponse omClientResponse = null;
     try {
-      // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
-
+      // TODO to support S3 ACL later.
       acquiredLock =
           omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
               bucketName);

+ 1 - 9
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java

@@ -40,8 +40,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -111,13 +109,7 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
     String multipartKey = null;
     OmMultipartKeyInfo multipartKeyInfo = null;
     try {
-      // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
-
+      // TODO to support S3 ACL later.
       acquiredLock =
           omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
               bucketName);

+ 1 - 9
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java

@@ -59,8 +59,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .PartKeyInfo;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
@@ -125,13 +123,7 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
     IOException exception = null;
     OmMultipartUploadList multipartUploadList = null;
     try {
-      // check Acl
-      if (ozoneManager.getAclsEnabled()) {
-        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
-            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
-            volumeName, bucketName, keyName);
-      }
-
+      // TODO to support S3 ACL later.
       TreeMap<Integer, String> partsMap = new TreeMap<>();
       for (OzoneManagerProtocolProtos.Part part : partsList) {
         partsMap.put(part.getPartNumber(), part.getPartName());