Browse Source

HDDS-1911. Support Prefix ACL operations for OM HA. (#1275)

Bharat Viswanadham 6 years ago
parent
commit
c8675ec42e
13 changed files with 977 additions and 122 deletions
  1. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
  2. 77 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
  3. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java
  4. 1 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  5. 212 114
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java
  6. 10 3
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
  7. 197 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java
  8. 122 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAddAclRequest.java
  9. 119 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixRemoveAclRequest.java
  10. 120 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixSetAclRequest.java
  11. 22 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/package-info.java
  12. 71 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/prefix/OMPrefixAclResponse.java
  13. 24 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/prefix/package-info.java

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

@@ -159,7 +159,7 @@ public class TestKeyManagerImpl {
     keyManager =
     keyManager =
         new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
         new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
             "om1", null);
             "om1", null);
-    prefixManager = new PrefixManagerImpl(metadataManager);
+    prefixManager = new PrefixManagerImpl(metadataManager, false);
 
 
     Mockito.when(mockScmBlockLocationProtocol
     Mockito.when(mockScmBlockLocationProtocol
         .allocateBlock(Mockito.anyLong(), Mockito.anyInt(),
         .allocateBlock(Mockito.anyLong(), Mockito.anyInt(),

+ 77 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.util.Time;
 
 
 import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
 import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
     .NODE_FAILURE_TIMEOUT;
     .NODE_FAILURE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
 import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -899,13 +900,87 @@ public class TestOzoneManagerHA {
 
 
   }
   }
 
 
+  @Test
+  public void testAddPrefixAcl() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String remoteUserName = "remoteUser";
+    String prefixName = RandomStringUtils.randomAlphabetic(5) +"/";
+    OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
+        READ, DEFAULT);
+
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setBucketName(ozoneBucket.getName())
+        .setPrefixName(prefixName).build();
+
+    testAddAcl(remoteUserName, ozoneObj, defaultUserAcl);
+  }
+  @Test
+  public void testRemovePrefixAcl() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String remoteUserName = "remoteUser";
+    String prefixName = RandomStringUtils.randomAlphabetic(5) +"/";
+    OzoneAcl userAcl = new OzoneAcl(USER, remoteUserName,
+        READ, ACCESS);
+    OzoneAcl userAcl1 = new OzoneAcl(USER, "remote",
+        READ, ACCESS);
+
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setBucketName(ozoneBucket.getName())
+        .setPrefixName(prefixName).build();
+
+    boolean result = objectStore.addAcl(ozoneObj, userAcl);
+    Assert.assertTrue(result);
+
+    result = objectStore.addAcl(ozoneObj, userAcl1);
+    Assert.assertTrue(result);
+
+    result = objectStore.removeAcl(ozoneObj, userAcl);
+    Assert.assertTrue(result);
+
+    // try removing already removed acl.
+    result = objectStore.removeAcl(ozoneObj, userAcl);
+    Assert.assertFalse(result);
+
+    result = objectStore.removeAcl(ozoneObj, userAcl1);
+    Assert.assertTrue(result);
+
+  }
+
+  @Test
+  public void testSetPrefixAcl() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String remoteUserName = "remoteUser";
+    String prefixName = RandomStringUtils.randomAlphabetic(5) +"/";
+    OzoneAcl defaultUserAcl = new OzoneAcl(USER, remoteUserName,
+        READ, DEFAULT);
+
+    OzoneObj ozoneObj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(OzoneObj.ResourceType.PREFIX)
+        .setStoreType(OzoneObj.StoreType.OZONE)
+        .setVolumeName(ozoneBucket.getVolumeName())
+        .setBucketName(ozoneBucket.getName())
+        .setPrefixName(prefixName).build();
+
+    testSetAcl(remoteUserName, ozoneObj, defaultUserAcl);
+  }
+
 
 
   private void testSetAcl(String remoteUserName, OzoneObj ozoneObj,
   private void testSetAcl(String remoteUserName, OzoneObj ozoneObj,
       OzoneAcl userAcl) throws Exception {
       OzoneAcl userAcl) throws Exception {
     // As by default create will add some default acls in RpcClient.
     // As by default create will add some default acls in RpcClient.
-    List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
 
 
-    Assert.assertTrue(acls.size() > 0);
+    if (!ozoneObj.getResourceType().name().equals(
+        OzoneObj.ResourceType.PREFIX.name())) {
+      List<OzoneAcl> acls = objectStore.getAcl(ozoneObj);
+
+      Assert.assertTrue(acls.size() > 0);
+    }
 
 
     OzoneAcl modifiedUserAcl = new OzoneAcl(USER, remoteUserName,
     OzoneAcl modifiedUserAcl = new OzoneAcl(USER, remoteUserName,
         WRITE, DEFAULT);
         WRITE, DEFAULT);

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneNativeAuthorizer.java

@@ -150,7 +150,7 @@ public class TestOzoneNativeAuthorizer {
     metadataManager = new OmMetadataManagerImpl(ozConfig);
     metadataManager = new OmMetadataManagerImpl(ozConfig);
     volumeManager = new VolumeManagerImpl(metadataManager, ozConfig);
     volumeManager = new VolumeManagerImpl(metadataManager, ozConfig);
     bucketManager = new BucketManagerImpl(metadataManager);
     bucketManager = new BucketManagerImpl(metadataManager);
-    prefixManager = new PrefixManagerImpl(metadataManager);
+    prefixManager = new PrefixManagerImpl(metadataManager, false);
 
 
     NodeManager nodeManager = new MockNodeManager(true, 10);
     NodeManager nodeManager = new MockNodeManager(true, 10);
     SCMConfigurator configurator = new SCMConfigurator();
     SCMConfigurator configurator = new SCMConfigurator();

+ 1 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -445,7 +445,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       delegationTokenMgr = createDelegationTokenSecretManager(configuration);
       delegationTokenMgr = createDelegationTokenSecretManager(configuration);
     }
     }
 
 
-    prefixManager = new PrefixManagerImpl(metadataManager);
+    prefixManager = new PrefixManagerImpl(metadataManager, isRatisEnabled);
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
         omStorage.getOmId());
         omStorage.getOmId());
 
 

+ 212 - 114
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/PrefixManagerImpl.java

@@ -33,10 +33,12 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
+import static org.apache.hadoop.ozone.OzoneAcl.ZERO_BITSET;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PREFIX_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PREFIX_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
@@ -44,7 +46,7 @@ import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.PREFIX_L
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.PREFIX;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.PREFIX;
 
 
 /**
 /**
- * Implementation of PreManager.
+ * Implementation of PrefixManager.
  */
  */
 public class PrefixManagerImpl implements PrefixManager {
 public class PrefixManagerImpl implements PrefixManager {
   private static final Logger LOG =
   private static final Logger LOG =
@@ -56,7 +58,13 @@ public class PrefixManagerImpl implements PrefixManager {
   // In-memory prefix tree to optimize ACL evaluation
   // In-memory prefix tree to optimize ACL evaluation
   private RadixTree<OmPrefixInfo> prefixTree;
   private RadixTree<OmPrefixInfo> prefixTree;
 
 
-  public PrefixManagerImpl(OMMetadataManager metadataManager) {
+  // TODO: This isRatisEnabled check will be removed as part of HDDS-1909,
+  //  where we integrate both HA and Non-HA code.
+  private boolean isRatisEnabled;
+
+  public PrefixManagerImpl(OMMetadataManager metadataManager,
+      boolean isRatisEnabled) {
+    this.isRatisEnabled = isRatisEnabled;
     this.metadataManager = metadataManager;
     this.metadataManager = metadataManager;
     loadPrefixTree();
     loadPrefixTree();
   }
   }
@@ -99,39 +107,10 @@ public class PrefixManagerImpl implements PrefixManager {
     try {
     try {
       OmPrefixInfo prefixInfo =
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
           metadataManager.getPrefixTable().get(prefixPath);
-      List<OzoneAcl> list = null;
-      if (prefixInfo != null) {
-        list = prefixInfo.getAcls();
-      }
 
 
-      if (list == null) {
-        list = new ArrayList<>();
-        list.add(acl);
-      } else {
-        boolean found = false;
-        for (OzoneAcl a: list) {
-          if (a.getName().equals(acl.getName()) &&
-              a.getType() == acl.getType()) {
-            found = true;
-            a.getAclBitSet().or(acl.getAclBitSet());
-            break;
-          }
-        }
-        if (!found) {
-          list.add(acl);
-        }
-      }
+      OMPrefixAclOpResult omPrefixAclOpResult = addAcl(obj, acl, prefixInfo);
 
 
-      OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-      upiBuilder.setName(prefixPath).setAcls(list);
-      if (prefixInfo != null && prefixInfo.getMetadata() != null) {
-        upiBuilder.addAllMetadata(prefixInfo.getMetadata());
-      }
-      prefixInfo = upiBuilder.build();
-      // Persist into prefix table first
-      metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
-      // update the in-memory prefix tree
-      prefixTree.insert(prefixPath, prefixInfo);
+      return omPrefixAclOpResult.isOperationsResult();
     } catch (IOException ex) {
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
       if (!(ex instanceof OMException)) {
         LOG.error("Add acl operation failed for prefix path:{} acl:{}",
         LOG.error("Add acl operation failed for prefix path:{} acl:{}",
@@ -141,7 +120,6 @@ public class PrefixManagerImpl implements PrefixManager {
     } finally {
     } finally {
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     }
-    return true;
   }
   }
 
 
   /**
   /**
@@ -160,47 +138,14 @@ public class PrefixManagerImpl implements PrefixManager {
     try {
     try {
       OmPrefixInfo prefixInfo =
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
           metadataManager.getPrefixTable().get(prefixPath);
-      List<OzoneAcl> list = null;
-      if (prefixInfo != null) {
-        list = prefixInfo.getAcls();
-      }
-
-      if (list == null) {
-        LOG.debug("acl {} does not exist for prefix path {}", acl, prefixPath);
-        return false;
-      }
+      OMPrefixAclOpResult omPrefixAclOpResult = removeAcl(obj, acl, prefixInfo);
 
 
-      boolean found = false;
-      for (OzoneAcl a: list) {
-        if (a.getName().equals(acl.getName())
-            && a.getType() == acl.getType()) {
-          found = true;
-          a.getAclBitSet().andNot(acl.getAclBitSet());
-          if (a.getAclBitSet().isEmpty()) {
-            list.remove(a);
-          }
-          break;
-        }
-      }
-      if (!found) {
-        LOG.debug("acl {} does not exist for prefix path {}", acl, prefixPath);
+      if (!omPrefixAclOpResult.isOperationsResult()) {
+        LOG.debug("acl {} does not exist for prefix path {} ", acl, prefixPath);
         return false;
         return false;
       }
       }
 
 
-      if (!list.isEmpty()) {
-        OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-        upiBuilder.setName(prefixPath).setAcls(list);
-        if (prefixInfo != null && prefixInfo.getMetadata() != null) {
-          upiBuilder.addAllMetadata(prefixInfo.getMetadata());
-        }
-        prefixInfo = upiBuilder.build();
-        metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
-        prefixTree.insert(prefixPath, prefixInfo);
-      } else {
-        // Remove prefix entry in table and prefix tree if the # of acls is 0
-        metadataManager.getPrefixTable().delete(prefixPath);
-        prefixTree.removePrefixPath(prefixPath);
-      }
+      return omPrefixAclOpResult.isOperationsResult();
 
 
     } catch (IOException ex) {
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
       if (!(ex instanceof OMException)) {
@@ -211,7 +156,6 @@ public class PrefixManagerImpl implements PrefixManager {
     } finally {
     } finally {
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     }
-    return true;
   }
   }
 
 
   /**
   /**
@@ -230,48 +174,10 @@ public class PrefixManagerImpl implements PrefixManager {
     try {
     try {
       OmPrefixInfo prefixInfo =
       OmPrefixInfo prefixInfo =
           metadataManager.getPrefixTable().get(prefixPath);
           metadataManager.getPrefixTable().get(prefixPath);
-      OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
-      List<OzoneAcl> aclsToBeSet = new ArrayList<>(acls.size());
-      aclsToBeSet.addAll(acls);
-      upiBuilder.setName(prefixPath);
-      if (prefixInfo != null && prefixInfo.getMetadata() != null) {
-        upiBuilder.addAllMetadata(prefixInfo.getMetadata());
-      }
-
-      // Inherit DEFAULT acls from prefix.
-      boolean prefixParentFound = false;
-      List<OmPrefixInfo> prefixList = getLongestPrefixPathHelper(
-          prefixTree.getLongestPrefix(prefixPath));
-
-      if (prefixList.size() > 0) {
-        // Add all acls from direct parent to key.
-        OmPrefixInfo parentPrefixInfo = prefixList.get(prefixList.size() - 1);
-        if (parentPrefixInfo != null) {
-          aclsToBeSet.addAll(OzoneUtils.getDefaultAcls(
-              parentPrefixInfo.getAcls()));
-          prefixParentFound = true;
-        }
-      }
 
 
-      // If no parent prefix is found inherit DEFULT acls from bucket.
-      if (!prefixParentFound) {
-        String bucketKey = metadataManager.getBucketKey(obj.getVolumeName(),
-            obj.getBucketName());
-        OmBucketInfo bucketInfo = metadataManager.getBucketTable().
-            get(bucketKey);
-        if (bucketInfo != null) {
-          bucketInfo.getAcls().forEach(a -> {
-            if (a.getAclScope().equals(OzoneAcl.AclScope.DEFAULT)) {
-              aclsToBeSet.add(new OzoneAcl(a.getType(), a.getName(),
-                  a.getAclBitSet(), OzoneAcl.AclScope.ACCESS));
-            }
-          });
-        }
-      }
+      OMPrefixAclOpResult omPrefixAclOpResult = setAcl(obj, acls, prefixInfo);
 
 
-      prefixInfo = upiBuilder.setAcls(aclsToBeSet).build();
-      prefixTree.insert(prefixPath, prefixInfo);
-      metadataManager.getPrefixTable().put(prefixPath, prefixInfo);
+      return omPrefixAclOpResult.isOperationsResult();
     } catch (IOException ex) {
     } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
       if (!(ex instanceof OMException)) {
         LOG.error("Set prefix acl operation failed for prefix path:{} acls:{}",
         LOG.error("Set prefix acl operation failed for prefix path:{} acls:{}",
@@ -281,7 +187,6 @@ public class PrefixManagerImpl implements PrefixManager {
     } finally {
     } finally {
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
       metadataManager.getLock().releaseLock(PREFIX_LOCK, prefixPath);
     }
     }
-    return true;
   }
   }
 
 
   /**
   /**
@@ -372,7 +277,7 @@ public class PrefixManagerImpl implements PrefixManager {
    * Helper method to validate ozone object.
    * Helper method to validate ozone object.
    * @param obj
    * @param obj
    * */
    * */
-  private void validateOzoneObj(OzoneObj obj) throws OMException {
+  public void validateOzoneObj(OzoneObj obj) throws OMException {
     Objects.requireNonNull(obj);
     Objects.requireNonNull(obj);
 
 
     if (!obj.getResourceType().equals(PREFIX)) {
     if (!obj.getResourceType().equals(PREFIX)) {
@@ -397,4 +302,197 @@ public class PrefixManagerImpl implements PrefixManager {
           PREFIX_NOT_FOUND);
           PREFIX_NOT_FOUND);
     }
     }
   }
   }
+
+  public OMPrefixAclOpResult addAcl(OzoneObj ozoneObj, OzoneAcl ozoneAcl,
+      OmPrefixInfo prefixInfo) throws IOException {
+
+    List<OzoneAcl> ozoneAclList = null;
+    if (prefixInfo != null) {
+      ozoneAclList = prefixInfo.getAcls();
+    }
+
+    if (ozoneAclList == null) {
+      ozoneAclList = new ArrayList<>();
+      ozoneAclList.add(ozoneAcl);
+    } else {
+      boolean addToExistingAcl = false;
+      for(OzoneAcl existingAcl: ozoneAclList) {
+        if(existingAcl.getName().equals(ozoneAcl.getName()) &&
+            existingAcl.getType().equals(ozoneAcl.getType())) {
+
+          BitSet bits = (BitSet) ozoneAcl.getAclBitSet().clone();
+
+          // We need to do "or" before comparision because think of a case like
+          // existing acl is 777 and newly added acl is 444, we have already
+          // that acl set. In this case if we do direct check they will not
+          // be equal, but if we do or and then check, we shall know it
+          // has acl's already set or not.
+          bits.or(existingAcl.getAclBitSet());
+
+          if (bits.equals(existingAcl.getAclBitSet())) {
+            return new OMPrefixAclOpResult(null, false);
+          } else {
+            existingAcl.getAclBitSet().or(ozoneAcl.getAclBitSet());
+            addToExistingAcl = true;
+            break;
+          }
+        }
+      }
+      if (!addToExistingAcl) {
+        ozoneAclList.add(ozoneAcl);
+      }
+    }
+    OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
+    upiBuilder.setName(ozoneObj.getPath()).setAcls(ozoneAclList);
+    if (prefixInfo != null && prefixInfo.getMetadata() != null) {
+      upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+    }
+    prefixInfo = upiBuilder.build();
+
+    // update the in-memory prefix tree
+    prefixTree.insert(ozoneObj.getPath(), prefixInfo);
+
+    if (!isRatisEnabled) {
+      metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
+    }
+    return new OMPrefixAclOpResult(prefixInfo, true);
+  }
+
+  public OMPrefixAclOpResult removeAcl(OzoneObj ozoneObj, OzoneAcl ozoneAcl,
+      OmPrefixInfo prefixInfo) throws IOException {
+    List<OzoneAcl> list = null;
+    if (prefixInfo != null) {
+      list = prefixInfo.getAcls();
+    }
+
+    if (list == null) {
+      return new OMPrefixAclOpResult(null, false);
+    }
+
+    boolean removed = false;
+    for (OzoneAcl existingAcl: list) {
+      if (existingAcl.getName().equals(ozoneAcl.getName())
+          && existingAcl.getType() == ozoneAcl.getType()) {
+        BitSet bits = (BitSet) ozoneAcl.getAclBitSet().clone();
+        bits.and(existingAcl.getAclBitSet());
+
+        // This happens when the acl bitset is not existing for current name
+        // and type.
+        // Like a case we have 444 permission, 333 is asked to removed.
+        if (bits.equals(ZERO_BITSET)) {
+          removed = false;
+          break;
+        }
+
+        // We have some matching. Remove them.
+        existingAcl.getAclBitSet().xor(bits);
+
+        // If existing acl has same bitset as passed acl bitset, remove that
+        // acl from the list
+        if (existingAcl.getAclBitSet().equals(ZERO_BITSET)) {
+          list.remove(existingAcl);
+        }
+        removed = true;
+        break;
+      }
+    }
+
+    // Nothing is matching to remove.
+    if (!removed) {
+      return new OMPrefixAclOpResult(null, false);
+    } else {
+      OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
+      upiBuilder.setName(ozoneObj.getPath()).setAcls(list);
+      if (prefixInfo != null && prefixInfo.getMetadata() != null) {
+        upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+      }
+      prefixInfo = upiBuilder.build();
+
+      // Update in-memory prefix tree.
+      if (list.isEmpty()) {
+        prefixTree.removePrefixPath(ozoneObj.getPath());
+        if (!isRatisEnabled) {
+          metadataManager.getPrefixTable().delete(ozoneObj.getPath());
+        }
+      } else {
+        prefixTree.insert(ozoneObj.getPath(), prefixInfo);
+        if (!isRatisEnabled) {
+          metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
+        }
+      }
+      return new OMPrefixAclOpResult(prefixInfo, true);
+    }
+  }
+
+  public OMPrefixAclOpResult setAcl(OzoneObj ozoneObj, List<OzoneAcl> ozoneAcls,
+      OmPrefixInfo prefixInfo) throws IOException {
+    OmPrefixInfo.Builder upiBuilder = OmPrefixInfo.newBuilder();
+    List<OzoneAcl> aclsToBeSet = new ArrayList<>(ozoneAcls.size());
+    aclsToBeSet.addAll(ozoneAcls);
+    upiBuilder.setName(ozoneObj.getPath());
+    if (prefixInfo != null && prefixInfo.getMetadata() != null) {
+      upiBuilder.addAllMetadata(prefixInfo.getMetadata());
+    }
+
+    // Inherit DEFAULT acls from prefix.
+    boolean prefixParentFound = false;
+    List<OmPrefixInfo> prefixList = getLongestPrefixPathHelper(
+        prefixTree.getLongestPrefix(ozoneObj.getPath()));
+
+    if (prefixList.size() > 0) {
+      // Add all acls from direct parent to key.
+      OmPrefixInfo parentPrefixInfo = prefixList.get(prefixList.size() - 1);
+      if (parentPrefixInfo != null) {
+        aclsToBeSet.addAll(OzoneUtils.getDefaultAcls(
+            parentPrefixInfo.getAcls()));
+        prefixParentFound = true;
+      }
+    }
+
+    // If no parent prefix is found inherit DEFULT acls from bucket.
+    if (!prefixParentFound) {
+      String bucketKey = metadataManager.getBucketKey(ozoneObj.getVolumeName(),
+          ozoneObj.getBucketName());
+      OmBucketInfo bucketInfo = metadataManager.getBucketTable().
+          get(bucketKey);
+      if (bucketInfo != null) {
+        bucketInfo.getAcls().forEach(a -> {
+          if (a.getAclScope().equals(OzoneAcl.AclScope.DEFAULT)) {
+            aclsToBeSet.add(new OzoneAcl(a.getType(), a.getName(),
+                a.getAclBitSet(), OzoneAcl.AclScope.ACCESS));
+          }
+        });
+      }
+    }
+
+    prefixInfo = upiBuilder.setAcls(aclsToBeSet).build();
+
+    prefixTree.insert(ozoneObj.getPath(), prefixInfo);
+    if (!isRatisEnabled) {
+      metadataManager.getPrefixTable().put(ozoneObj.getPath(), prefixInfo);
+    }
+    return new OMPrefixAclOpResult(prefixInfo, true);
+  }
+
+  /**
+   * Result of the prefix acl operation.
+   */
+  public static class OMPrefixAclOpResult {
+    private OmPrefixInfo omPrefixInfo;
+    private boolean operationsResult;
+
+    public OMPrefixAclOpResult(OmPrefixInfo omPrefixInfo,
+        boolean operationsResult) {
+      this.omPrefixInfo = omPrefixInfo;
+      this.operationsResult = operationsResult;
+    }
+
+    public OmPrefixInfo getOmPrefixInfo() {
+      return omPrefixInfo;
+    }
+
+    public boolean isOperationsResult() {
+      return operationsResult;
+    }
+  }
 }
 }

+ 10 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java

@@ -37,6 +37,9 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixAddAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixRemoveAclRequest;
+import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
@@ -149,6 +152,8 @@ public final class OzoneManagerRatisUtils {
         return new OMBucketAddAclRequest(omRequest);
         return new OMBucketAddAclRequest(omRequest);
       } else if (ObjectType.KEY == type) {
       } else if (ObjectType.KEY == type) {
         return new OMKeyAddAclRequest(omRequest);
         return new OMKeyAddAclRequest(omRequest);
+      } else {
+        return new OMPrefixAddAclRequest(omRequest);
       }
       }
     } else if (Type.RemoveAcl == cmdType) {
     } else if (Type.RemoveAcl == cmdType) {
       ObjectType type = omRequest.getRemoveAclRequest().getObj().getResType();
       ObjectType type = omRequest.getRemoveAclRequest().getObj().getResType();
@@ -158,8 +163,10 @@ public final class OzoneManagerRatisUtils {
         return new OMBucketRemoveAclRequest(omRequest);
         return new OMBucketRemoveAclRequest(omRequest);
       } else if (ObjectType.KEY == type) {
       } else if (ObjectType.KEY == type) {
         return new OMKeyRemoveAclRequest(omRequest);
         return new OMKeyRemoveAclRequest(omRequest);
+      } else {
+        return new OMPrefixRemoveAclRequest(omRequest);
       }
       }
-    } else if (Type.SetAcl == cmdType) {
+    } else {
       ObjectType type = omRequest.getSetAclRequest().getObj().getResType();
       ObjectType type = omRequest.getSetAclRequest().getObj().getResType();
       if (ObjectType.VOLUME == type) {
       if (ObjectType.VOLUME == type) {
         return new OMVolumeSetAclRequest(omRequest);
         return new OMVolumeSetAclRequest(omRequest);
@@ -167,10 +174,10 @@ public final class OzoneManagerRatisUtils {
         return new OMBucketSetAclRequest(omRequest);
         return new OMBucketSetAclRequest(omRequest);
       } else if (ObjectType.KEY == type) {
       } else if (ObjectType.KEY == type) {
         return new OMKeySetAclRequest(omRequest);
         return new OMKeySetAclRequest(omRequest);
+      } else {
+        return new OMPrefixSetAclRequest(omRequest);
       }
       }
     }
     }
-    //TODO: handle key and prefix AddAcl
-    return null;
   }
   }
 
 
   /**
   /**

+ 197 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAclRequest.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl.prefix;
+
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.PREFIX_LOCK;
+
+/**
+ * Base class for Prefix acl request.
+ */
+public abstract class OMPrefixAclRequest extends OMClientRequest {
+
+  public OMPrefixAclRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+
+    OmPrefixInfo omPrefixInfo = null;
+
+    OMResponse.Builder omResponse = onInit();
+    OMClientResponse omClientResponse = null;
+    IOException exception = null;
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean lockAcquired = false;
+    String volume = null;
+    String bucket = null;
+    String key = null;
+    OMPrefixAclOpResult operationResult = null;
+    boolean result = false;
+
+    PrefixManagerImpl prefixManager =
+        (PrefixManagerImpl) ozoneManager.getPrefixManager();
+    try {
+      String prefixPath = getOzoneObj().getPath();
+
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.PREFIX,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE_ACL,
+            volume, bucket, key);
+      }
+
+      lockAcquired =
+          omMetadataManager.getLock().acquireLock(PREFIX_LOCK, prefixPath);
+
+      omPrefixInfo = omMetadataManager.getPrefixTable().get(prefixPath);
+
+      try {
+        operationResult = apply(prefixManager, omPrefixInfo);
+      } catch (IOException ex) {
+        // In HA case this will never happen.
+        // As in add/remove/setAcl method we have logic to update database,
+        // that can throw exception. But in HA case we shall not update DB.
+        // The code in prefixManagerImpl is being done, because update
+        // in-memory should be done after DB update for Non-HA code path.
+        operationResult = new OMPrefixAclOpResult(null, false);
+      }
+
+      if (operationResult.isOperationsResult()) {
+        // As for remove acl list, for a prefix if after removing acl from
+        // the existing acl list, if list size becomes zero, delete the
+        // prefix from prefix table.
+        if (getOmRequest().hasRemoveAclRequest() &&
+            operationResult.getOmPrefixInfo().getAcls().size() == 0) {
+          omMetadataManager.getPrefixTable().addCacheEntry(
+              new CacheKey<>(prefixPath),
+              new CacheValue<>(Optional.absent(), transactionLogIndex));
+        } else {
+          // update cache.
+          omMetadataManager.getPrefixTable().addCacheEntry(
+              new CacheKey<>(prefixPath),
+              new CacheValue<>(Optional.of(operationResult.getOmPrefixInfo()),
+                  transactionLogIndex));
+        }
+      }
+
+      result  = operationResult.isOperationsResult();
+      omClientResponse = onSuccess(omResponse,
+          operationResult.getOmPrefixInfo(), result);
+
+    } catch (IOException ex) {
+      exception = ex;
+      omClientResponse = onFailure(omResponse, ex);
+    } finally {
+      if (omClientResponse != null) {
+        omClientResponse.setFlushFuture(
+            ozoneManagerDoubleBufferHelper.add(omClientResponse,
+                transactionLogIndex));
+      }
+      if (lockAcquired) {
+        omMetadataManager.getLock().releaseLock(PREFIX_LOCK,
+            getOzoneObj().getPath());
+      }
+    }
+
+    onComplete(result, exception, ozoneManager.getMetrics());
+
+    return omClientResponse;
+  }
+
+  /**
+   * Get the path name from the request.
+   * @return path name
+   */
+  abstract OzoneObj getOzoneObj();
+
+  // TODO: Finer grain metrics can be moved to these callbacks. They can also
+  // be abstracted into separate interfaces in future.
+  /**
+   * Get the initial om response builder with lock.
+   * @return om response builder.
+   */
+  abstract OMResponse.Builder onInit();
+
+  /**
+   * Get the om client response on success case with lock.
+   * @param omResponse
+   * @param omPrefixInfo
+   * @param operationResult
+   * @return OMClientResponse
+   */
+  abstract OMClientResponse onSuccess(
+      OMResponse.Builder omResponse, OmPrefixInfo omPrefixInfo,
+      boolean operationResult);
+
+  /**
+   * Get the om client response on failure case with lock.
+   * @param omResponse
+   * @param exception
+   * @return OMClientResponse
+   */
+  abstract OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception);
+
+  /**
+   * Completion hook for final processing before return without lock.
+   * Usually used for logging without lock and metric update.
+   * @param operationResult
+   * @param exception
+   * @param omMetrics
+   */
+  abstract void onComplete(boolean operationResult, IOException exception,
+      OMMetrics omMetrics);
+
+  /**
+   * Apply the acl operation, if successfully completed returns true,
+   * else false.
+   * @param prefixManager
+   * @param omPrefixInfo
+   * @throws IOException
+   */
+  abstract OMPrefixAclOpResult apply(PrefixManagerImpl prefixManager,
+      OmPrefixInfo omPrefixInfo) throws IOException;
+
+
+}
+

+ 122 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAddAclRequest.java

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl.prefix;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.response.key.acl.prefix.OMPrefixAclResponse;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .AddAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+
+/**
+ * Handle add Acl request for prefix.
+ */
+public class OMPrefixAddAclRequest extends OMPrefixAclRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPrefixAddAclRequest.class);
+
+  private OzoneObj ozoneObj;
+  private List<OzoneAcl> ozoneAcls;
+
+  public OMPrefixAddAclRequest(OMRequest omRequest) {
+    super(omRequest);
+    OzoneManagerProtocolProtos.AddAclRequest addAclRequest =
+        getOmRequest().getAddAclRequest();
+    // TODO: conversion of OzoneObj to protobuf can be avoided when we have
+    //  single code path for HA and Non-HA
+    ozoneObj = OzoneObjInfo.fromProtobuf(addAclRequest.getObj());
+    ozoneAcls = Lists.newArrayList(
+        OzoneAcl.fromProtobuf(addAclRequest.getAcl()));
+  }
+
+  @Override
+  OzoneObj getOzoneObj() {
+    return ozoneObj;
+  }
+
+  @Override
+  OMResponse.Builder onInit() {
+    return OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.AddAcl).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+  }
+
+  @Override
+  OMClientResponse onSuccess(OMResponse.Builder omResponse,
+      OmPrefixInfo omPrefixInfo, boolean operationResult) {
+    omResponse.setSuccess(operationResult);
+    omResponse.setAddAclResponse(AddAclResponse.newBuilder()
+        .setResponse(operationResult));
+    return new OMPrefixAclResponse(omPrefixInfo,
+        omResponse.build());
+  }
+
+  @Override
+  OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception) {
+    return new OMPrefixAclResponse(null,
+        createErrorOMResponse(omResponse, exception));
+  }
+
+  @Override
+  void onComplete(boolean operationResult, IOException exception,
+      OMMetrics omMetrics) {
+    if (operationResult) {
+      LOG.debug("Add acl: {} to path: {} success!", ozoneAcls,
+          ozoneObj.getPath());
+    } else {
+      omMetrics.incNumBucketUpdateFails();
+      if (exception == null) {
+        LOG.debug("Add acl {} to path {} failed, because acl already exist",
+            ozoneAcls, ozoneObj.getPath());
+      } else {
+        LOG.error("Add acl {} to path {} failed!", ozoneAcls,
+            ozoneObj.getPath(), exception);
+      }
+    }
+  }
+
+  @Override
+  OMPrefixAclOpResult apply(PrefixManagerImpl prefixManager,
+      OmPrefixInfo omPrefixInfo) throws IOException {
+    return prefixManager.addAcl(ozoneObj, ozoneAcls.get(0), omPrefixInfo);
+  }
+
+}
+

+ 119 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixRemoveAclRequest.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl.prefix;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.response.key.acl.prefix.OMPrefixAclResponse;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclResponse;
+
+/**
+ * Handle add Acl request for prefix.
+ */
+public class OMPrefixRemoveAclRequest extends OMPrefixAclRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPrefixAddAclRequest.class);
+
+  private OzoneObj ozoneObj;
+  private List<OzoneAcl> ozoneAcls;
+
+  public OMPrefixRemoveAclRequest(OMRequest omRequest) {
+    super(omRequest);
+    OzoneManagerProtocolProtos.RemoveAclRequest removeAclRequest =
+        getOmRequest().getRemoveAclRequest();
+    // TODO: conversion of OzoneObj to protobuf can be avoided when we have
+    //  single code path for HA and Non-HA
+    ozoneObj = OzoneObjInfo.fromProtobuf(removeAclRequest.getObj());
+    ozoneAcls = Lists.newArrayList(
+        OzoneAcl.fromProtobuf(removeAclRequest.getAcl()));
+  }
+
+  @Override
+  OzoneObj getOzoneObj() {
+    return ozoneObj;
+  }
+
+  @Override
+  OMResponse.Builder onInit() {
+    return OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.RemoveAcl).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+  }
+
+  @Override
+  OMClientResponse onSuccess(OMResponse.Builder omResponse,
+      OmPrefixInfo omPrefixInfo, boolean operationResult) {
+    omResponse.setSuccess(operationResult);
+    omResponse.setRemoveAclResponse(RemoveAclResponse.newBuilder()
+        .setResponse(operationResult));
+    return new OMPrefixAclResponse(omPrefixInfo,
+        omResponse.build());
+  }
+
+  @Override
+  OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception) {
+    return new OMPrefixAclResponse(null,
+        createErrorOMResponse(omResponse, exception));
+  }
+
+  @Override
+  void onComplete(boolean operationResult, IOException exception,
+      OMMetrics omMetrics) {
+    if (operationResult) {
+      LOG.debug("Remove acl: {} to path: {} success!", ozoneAcls,
+          ozoneObj.getPath());
+    } else {
+      omMetrics.incNumBucketUpdateFails();
+      if (exception == null) {
+        LOG.debug("Remove acl {} to path {} failed, because acl does not exist",
+            ozoneAcls, ozoneObj.getPath());
+      } else {
+        LOG.error("Remove acl {} to path {} failed!", ozoneAcls,
+            ozoneObj.getPath(), exception);
+      }
+    }
+  }
+
+  @Override
+  OMPrefixAclOpResult apply(PrefixManagerImpl prefixManager,
+      OmPrefixInfo omPrefixInfo) throws IOException {
+    return prefixManager.removeAcl(ozoneObj, ozoneAcls.get(0), omPrefixInfo);
+  }
+
+}
+

+ 120 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixSetAclRequest.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key.acl.prefix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl;
+import org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.response.key.acl.prefix.OMPrefixAclResponse;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
+
+/**
+ * Handle add Acl request for prefix.
+ */
+public class OMPrefixSetAclRequest extends OMPrefixAclRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPrefixAddAclRequest.class);
+
+  private OzoneObj ozoneObj;
+  private List<OzoneAcl> ozoneAcls;
+
+  public OMPrefixSetAclRequest(OMRequest omRequest) {
+    super(omRequest);
+    OzoneManagerProtocolProtos.SetAclRequest setAclRequest =
+        getOmRequest().getSetAclRequest();
+    // TODO: conversion of OzoneObj to protobuf can be avoided when we have
+    //  single code path for HA and Non-HA
+    ozoneObj = OzoneObjInfo.fromProtobuf(setAclRequest.getObj());
+    ozoneAcls = new ArrayList<>();
+    setAclRequest.getAclList().forEach(aclInfo ->
+        ozoneAcls.add(OzoneAcl.fromProtobuf(aclInfo)));
+  }
+
+  @Override
+  OzoneObj getOzoneObj() {
+    return ozoneObj;
+  }
+
+  @Override
+  OMResponse.Builder onInit() {
+    return OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.SetAcl).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+  }
+
+  @Override
+  OMClientResponse onSuccess(OMResponse.Builder omResponse,
+      OmPrefixInfo omPrefixInfo, boolean operationResult) {
+    omResponse.setSuccess(operationResult);
+    omResponse.setSetAclResponse(SetAclResponse.newBuilder()
+        .setResponse(operationResult));
+    return new OMPrefixAclResponse(omPrefixInfo,
+        omResponse.build());
+  }
+
+  @Override
+  OMClientResponse onFailure(OMResponse.Builder omResponse,
+      IOException exception) {
+    return new OMPrefixAclResponse(null,
+        createErrorOMResponse(omResponse, exception));
+  }
+
+  @Override
+  void onComplete(boolean operationResult, IOException exception,
+      OMMetrics omMetrics) {
+    if (operationResult) {
+      LOG.debug("Set acl: {} to path: {} success!", ozoneAcls,
+          ozoneObj.getPath());
+    } else {
+      omMetrics.incNumBucketUpdateFails();
+      if (exception == null) {
+        LOG.debug("Set acl {} to path {} failed", ozoneAcls,
+            ozoneObj.getPath());
+      } else {
+        LOG.error("Set acl {} to path {} failed!", ozoneAcls,
+            ozoneObj.getPath(), exception);
+      }
+    }
+  }
+
+  @Override
+  OMPrefixAclOpResult apply(PrefixManagerImpl prefixManager,
+      OmPrefixInfo omPrefixInfo) throws IOException {
+    return prefixManager.setAcl(ozoneObj, ozoneAcls, omPrefixInfo);
+  }
+
+}
+

+ 22 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package contains classes related to acl requests for prefix.
+ */
+package org.apache.hadoop.ozone.om.request.key.acl.prefix;

+ 71 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/prefix/OMPrefixAclResponse.java

@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.key.acl.prefix;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * Response for Prefix Acl request.
+ */
+public class OMPrefixAclResponse extends OMClientResponse {
+  private final OmPrefixInfo prefixInfo;
+
+  public OMPrefixAclResponse(@Nullable OmPrefixInfo omPrefixInfo,
+      @Nonnull OzoneManagerProtocolProtos.OMResponse omResponse) {
+    super(omResponse);
+    this.prefixInfo = omPrefixInfo;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    // If response status is OK and success is true, add to DB batch.
+    if (getOMResponse().getSuccess()) {
+      if ((getOMResponse().hasAddAclResponse()
+          && getOMResponse().getAddAclResponse().getResponse()) ||
+          (getOMResponse().hasSetAclResponse()
+              && getOMResponse().getSetAclResponse().getResponse())) {
+        omMetadataManager.getPrefixTable().putWithBatch(batchOperation,
+            prefixInfo.getName(), prefixInfo);
+      } else if ((getOMResponse().hasRemoveAclResponse()
+          && getOMResponse().getRemoveAclResponse().getResponse())) {
+        if (prefixInfo.getAcls().size() == 0) {
+          // if acl list size is zero delete.
+          omMetadataManager.getPrefixTable().deleteWithBatch(batchOperation,
+              prefixInfo.getName());
+        } else {
+          omMetadataManager.getPrefixTable().putWithBatch(batchOperation,
+              prefixInfo.getName(), prefixInfo);
+        }
+      }
+    }
+  }
+
+}
+
+

+ 24 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/acl/prefix/package-info.java

@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains classes related to prefix acl response.
+ */
+package org.apache.hadoop.ozone.om.response.key.acl.prefix;
+