Ver código fonte

HDDS-909 : Default implementation for Ozone acls.
Contributed by Ajay Kumar.

Anu Engineer 6 anos atrás
pai
commit
5e773efd78

+ 8 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -346,6 +346,14 @@ public final class OzoneConfigKeys {
   public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES =
       1024 * 1024;
   public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024;
+  public static final String OZONE_ACL_AUTHORIZER_CLASS =
+      "ozone.acl.authorizer.class";
+  public static final String OZONE_ACL_AUTHORIZER_CLASS_DEFAULT =
+      "org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer";
+  public static final String OZONE_ACL_ENABLED =
+      "ozone.acl.enabled";
+  public static final boolean OZONE_ACL_ENABLED_DEFAULT =
+      false;
 
   /**
    * There is no need to instantiate this class.

+ 16 - 1
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1138,7 +1138,8 @@
 
   <property>
     <name>ozone.tags.system</name>
-    <value>OZONE,MANAGEMENT,SECURITY,PERFORMANCE,DEBUG,CLIENT,SERVER,OM,SCM,CRITICAL,RATIS,CONTAINER,REQUIRED,REST,STORAGE,PIPELINE,STANDALONE,S3GATEWAY</value>
+    <value>OZONE,MANAGEMENT,SECURITY,PERFORMANCE,DEBUG,CLIENT,SERVER,OM,SCM,
+      CRITICAL,RATIS,CONTAINER,REQUIRED,REST,STORAGE,PIPELINE,STANDALONE,S3GATEWAY,ACL</value>
   </property>
 
 
@@ -1512,4 +1513,18 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.acl.authorizer.class</name>
+    <value>org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer</value>
+    <tag>OZONE, SECURITY, ACL</tag>
+    <description>Acl authorizer for Ozone.
+    </description>
+  </property>
+  <property>
+    <name>ozone.acl.enabled</name>
+    <value>false</value>
+    <tag>OZONE, SECURITY, ACL</tag>
+    <description>Key to enable/disable ozone acls.</description>
+  </property>
+
 </configuration>

+ 29 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneAccessAuthorizer.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security.acl;
+
+/**
+ * Default implementation for {@link IAccessAuthorizer}.
+ * */
+public class OzoneAccessAuthorizer implements IAccessAuthorizer {
+
+  @Override
+  public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
+      throws OzoneAclException {
+    return true;
+  }
+}

+ 18 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneAclException.java

@@ -16,12 +16,14 @@
  */
 package org.apache.hadoop.ozone.security.acl;
 
+import java.io.IOException;
+
 /**
  * Timeout exception thrown by Ozone. Ex: When checking ACLs for an Object if
  * security manager is not able to process the request in configured time than
  * {@link OzoneAclException} should be thrown.
  */
-public class OzoneAclException extends Exception {
+public class OzoneAclException extends IOException {
 
   private ErrorCode errorCode;
 
@@ -44,8 +46,22 @@ public class OzoneAclException extends Exception {
     this.errorCode = code;
   }
 
-  enum ErrorCode {
+  /**
+   * Constructs a new exception with {@code null} as its detail message. The
+   * cause is not initialized, and may subsequently be initialized by a call to
+   * {@link #initCause}.
+   */
+  public OzoneAclException(String errorMsg, ErrorCode code) {
+    super(errorMsg);
+    this.errorCode = code;
+  }
+
+  /**
+   * Error codes for OzoneAclException.
+   */
+  public enum ErrorCode {
     TIMEOUT,
+    PERMISSION_DENIED,
     OTHER
   }
 

+ 14 - 10
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObj.java

@@ -28,22 +28,28 @@ public abstract class OzoneObj implements IOzoneObj {
   private final ResourceType resType;
 
   private final StoreType storeType;
-  // Full path of resource.
-  private final String path;
 
-  OzoneObj(ResourceType resType, StoreType storeType, String path) {
-    Preconditions.checkNotNull(path);
+  OzoneObj(ResourceType resType, StoreType storeType) {
+
     Preconditions.checkNotNull(resType);
     Preconditions.checkNotNull(storeType);
     this.resType = resType;
     this.storeType = storeType;
-    this.path = path;
   }
 
   public ResourceType getResourceType() {
     return resType;
   }
 
+  @Override
+  public String toString() {
+    return "OzoneObj{" +
+        "resType=" + resType +
+        ", storeType=" + storeType +
+        ", path='" + getPath() + '\'' +
+        '}';
+  }
+
   public StoreType getStoreType() {
     return storeType;
   }
@@ -54,14 +60,12 @@ public abstract class OzoneObj implements IOzoneObj {
 
   public abstract String getKeyName();
 
-  public String getPath() {
-    return path;
-  }
+  public abstract String getPath();
 
   /**
    * Ozone Objects supported for ACL.
    */
-  enum ResourceType {
+  public enum ResourceType {
     VOLUME(OzoneConsts.VOLUME),
     BUCKET(OzoneConsts.BUCKET),
     KEY(OzoneConsts.KEY);
@@ -84,7 +88,7 @@ public abstract class OzoneObj implements IOzoneObj {
   /**
    * Ozone Objects supported for ACL.
    */
-  enum StoreType {
+  public enum StoreType {
     OZONE(OzoneConsts.OZONE),
     S3(OzoneConsts.S3);
 

+ 118 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/OzoneObjInfo.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security.acl;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * Class representing an ozone object.
+ */
+public final class OzoneObjInfo extends OzoneObj {
+
+  private final String volumeName;
+  private final String bucketName;
+  private final String keyName;
+
+
+  private OzoneObjInfo(ResourceType resType, StoreType storeType,
+      String volumeName, String bucketName, String keyName) {
+    super(resType, storeType);
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+    this.keyName = keyName;
+  }
+
+  @Override
+  public String getPath() {
+    switch (getResourceType()) {
+    case VOLUME:
+      return getVolumeName();
+    case BUCKET:
+      return getVolumeName() + OzoneConsts.OZONE_URI_DELIMITER
+          + getBucketName();
+    case KEY:
+      return getVolumeName() + OzoneConsts.OZONE_URI_DELIMITER
+          + getBucketName() + OzoneConsts.OZONE_URI_DELIMITER + getKeyName();
+    default:
+      throw new IllegalArgumentException("Unknown resource " +
+        "type" + getResourceType());
+    }
+
+  }
+
+  @Override
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  @Override
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  @Override
+  public String getKeyName() {
+    return keyName;
+  }
+
+  /**
+   * Inner builder class.
+   */
+  public static class Builder {
+
+    private OzoneObj.ResourceType resType;
+    private OzoneObj.StoreType storeType;
+    private String volumeName;
+    private String bucketName;
+    private String keyName;
+
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    public Builder setResType(OzoneObj.ResourceType res) {
+      this.resType = res;
+      return this;
+    }
+
+    public Builder setStoreType(OzoneObj.StoreType store) {
+      this.storeType = store;
+      return this;
+    }
+
+    public Builder setVolumeName(String volume) {
+      this.volumeName = volume;
+      return this;
+    }
+
+    public Builder setBucketName(String bucket) {
+      this.bucketName = bucket;
+      return this;
+    }
+
+    public Builder setKeyName(String key) {
+      this.keyName = key;
+      return this;
+    }
+
+    public OzoneObjInfo build() {
+      return new OzoneObjInfo(resType, storeType, volumeName, bucketName,
+          keyName);
+    }
+  }
+
+}

+ 5 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/acl/RequestContext.java

@@ -33,7 +33,7 @@ public class RequestContext {
   private final ACLIdentityType aclType;
   private final ACLType aclRights;
 
-  RequestContext(String host, InetAddress ip,
+  public RequestContext(String host, InetAddress ip,
       UserGroupInformation clientUgi, String serviceId,
       ACLIdentityType aclType, ACLType aclRights) {
     this.host = host;
@@ -91,6 +91,10 @@ public class RequestContext {
     }
   }
 
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
   public String getHost() {
     return host;
   }

+ 87 - 0
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/acl/TestOzoneObjInfo.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security.acl;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
+
+/**
+ * Test class for {@link OzoneObjInfo}.
+ * */
+public class TestOzoneObjInfo {
+
+  private OzoneObjInfo objInfo;
+  private OzoneObjInfo.Builder builder;
+  private String volume = "vol1";
+  private String bucket = "bucket1";
+  private String key = "key1";
+  private static final OzoneObj.StoreType STORE = OzoneObj.StoreType.OZONE;
+
+
+  @Test
+  public void testGetVolumeName() {
+
+    builder = getBuilder(volume, bucket, key);
+    objInfo = builder.build();
+    assertEquals(objInfo.getVolumeName(), volume);
+
+    objInfo = getBuilder(null, null, null).build();
+    assertEquals(objInfo.getVolumeName(), null);
+
+    objInfo = getBuilder(volume, null, null).build();
+    assertEquals(objInfo.getVolumeName(), volume);
+  }
+
+  private OzoneObjInfo.Builder getBuilder(String volume, String bucket,
+      String key) {
+    return OzoneObjInfo.Builder.newBuilder()
+        .setResType(ResourceType.VOLUME)
+        .setStoreType(STORE)
+        .setVolumeName(volume)
+        .setBucketName(bucket)
+        .setKeyName(key);
+  }
+
+  @Test
+  public void testGetBucketName() {
+    objInfo = getBuilder(volume, bucket, key).build();
+    assertEquals(objInfo.getBucketName(), bucket);
+
+    objInfo =getBuilder(volume, null, null).build();
+    assertEquals(objInfo.getBucketName(), null);
+
+    objInfo =getBuilder(null, bucket, null).build();
+    assertEquals(objInfo.getBucketName(), bucket);
+  }
+
+  @Test
+  public void testGetKeyName() {
+    objInfo = getBuilder(volume, bucket, key).build();
+    assertEquals(objInfo.getKeyName(), key);
+
+    objInfo =getBuilder(volume, null, null).build();
+    assertEquals(objInfo.getKeyName(), null);
+
+    objInfo =getBuilder(null, bucket, null).build();
+    assertEquals(objInfo.getKeyName(), null);
+
+    objInfo =getBuilder(null, null, key).build();
+    assertEquals(objInfo.getKeyName(), key);
+  }
+}

+ 176 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java

@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IOzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneAclException;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for Ozone Manager ACLs.
+ */
+public class TestOmAcls {
+
+  private static MiniOzoneCluster cluster = null;
+  private static StorageHandler storageHandler;
+  private static UserArgs userArgs;
+  private static OMMetrics omMetrics;
+  private static OzoneConfiguration conf;
+  private static String clusterId;
+  private static String scmId;
+  private static String omId;
+  private static GenericTestUtils.LogCapturer logCapturer;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omId = UUID.randomUUID().toString();
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+    conf.setClass(OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthrizerTest.class,
+        IAccessAuthorizer.class);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOmId(omId)
+        .build();
+    cluster.waitForClusterToBeReady();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    omMetrics = cluster.getOzoneManager().getMetrics();
+    logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.getLogger());
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+
+  /**
+   * Tests the OM Initialization.
+   */
+  @Test
+  public void testOMAclsPermissionDenied() throws Exception {
+    String user0 = "testListVolumes-user-0";
+    String adminUser = "testListVolumes-admin";
+    final VolumeArgs createVolumeArgs;
+    int i = 100;
+    String user0VolName = "Vol-" + user0 + "-" + i;
+    createVolumeArgs = new VolumeArgs(user0VolName, userArgs);
+    createVolumeArgs.setUserName(user0);
+    createVolumeArgs.setAdminName(adminUser);
+    createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
+    logCapturer.clearOutput();
+    LambdaTestUtils.intercept(IOException.class, "Volume creation failed",
+        () -> storageHandler.createVolume(createVolumeArgs));
+    assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
+        "permission to access volume"));
+
+    BucketArgs bucketArgs = new BucketArgs("bucket1", createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    LambdaTestUtils.intercept(IOException.class, "Bucket creation failed",
+        () -> storageHandler.createBucket(bucketArgs));
+    assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
+        "permission to access bucket"));
+  }
+
+  @Test
+  public void testFailureInKeyOp() throws Exception {
+    final VolumeArgs createVolumeArgs;
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    createVolumeArgs = new VolumeArgs(userName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    createVolumeArgs.setQuota(new OzoneQuota(100, OzoneQuota.Units.GB));
+    BucketArgs bucketArgs = new BucketArgs("bucket1", createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    logCapturer.clearOutput();
+
+    // write a key without specifying size at all
+    String keyName = "testKey";
+    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
+    LambdaTestUtils.intercept(IOException.class, "Create key failed",
+        () -> storageHandler.newKeyWriter(keyArgs));
+    assertTrue(logCapturer.getOutput().contains("doesn't have READ permission" +
+        " to access key"));
+  }
+}
+
+/**
+ * Test implementation to negative case.
+ */
+class OzoneAccessAuthrizerTest implements IAccessAuthorizer {
+
+  @Override
+  public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
+      throws OzoneAclException {
+    return false;
+  }
+}

+ 2 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java

@@ -87,6 +87,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
@@ -122,6 +123,7 @@ public class TestOzoneManager {
     clusterId = UUID.randomUUID().toString();
     scmId = UUID.randomUUID().toString();
     omId = UUID.randomUUID().toString();
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
     cluster =  MiniOzoneCluster.newBuilder(conf)
         .setClusterId(clusterId)

+ 180 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -70,8 +70,20 @@ import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
+import org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneAclException;
+import org.apache.hadoop.ozone.security.acl.OzoneAclException.ErrorCode;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
+import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.ratis.util.LifeCycle;
@@ -103,6 +115,9 @@ import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
@@ -159,6 +174,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
   private final Runnable shutdownHook;
   private final File omMetaDir;
+  private final boolean isAclEnabled;
+  private final IAccessAuthorizer accessAuthorizer;
 
   private OzoneManager(OzoneConfiguration conf) throws IOException {
     Preconditions.checkNotNull(conf);
@@ -198,11 +215,32 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     };
     ShutdownHookManager.get().addShutdownHook(shutdownHook,
         SHUTDOWN_HOOK_PRIORITY);
-
+    isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED,
+            OZONE_ACL_ENABLED_DEFAULT);
+    if (isAclEnabled) {
+      accessAuthorizer = getACLAuthorizerInstance(conf);
+    } else {
+      accessAuthorizer = null;
+    }
     omMetaDir = OmUtils.getOmDbDir(configuration);
 
   }
 
+  /**
+   * Returns an instance of {@link IAccessAuthorizer}.
+   * Looks up the configuration to see if there is custom class specified.
+   * Constructs the instance by passing the configuration directly to the
+   * constructor to achieve thread safety using final fields.
+   * @param conf
+   * @return IAccessAuthorizer
+   */
+  private IAccessAuthorizer getACLAuthorizerInstance(OzoneConfiguration conf) {
+    Class<? extends IAccessAuthorizer> clazz = conf.getClass(
+        OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthorizer.class,
+        IAccessAuthorizer.class);
+    return ReflectionUtils.newInstance(clazz, conf);
+  }
+
   /**
    * Class which schedule saving metrics to a file.
    */
@@ -646,6 +684,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void createVolume(OmVolumeArgs args) throws IOException {
     try {
+      if(isAclEnabled) {
+        checkAcls(ResourceType.VOLUME, StoreType.OZONE,
+            ACLType.CREATE, args.getVolume(), null, null);
+      }
       metrics.incNumVolumeCreates();
       volumeManager.createVolume(args);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.CREATE_VOLUME,
@@ -661,6 +703,46 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Checks if current caller has acl permissions.
+   *
+   * @param resType - Type of ozone resource. Ex volume, bucket.
+   * @param store   - Store type. i.e Ozone, S3.
+   * @param acl     - type of access to be checked.
+   * @param vol     - name of volume
+   * @param bucket  - bucket name
+   * @param key     - key
+   * @throws OzoneAclException
+   */
+  private void checkAcls(ResourceType resType, StoreType store,
+      ACLType acl, String vol, String bucket, String key)
+      throws OzoneAclException {
+    if(!isAclEnabled) {
+      return;
+    }
+
+    OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(resType)
+        .setStoreType(store)
+        .setVolumeName(vol)
+        .setBucketName(bucket)
+        .setKeyName(key).build();
+    UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
+    RequestContext context = RequestContext.newBuilder()
+        .setClientUgi(user)
+        .setIp(ProtobufRpcEngine.Server.getRemoteIp())
+        .setAclType(ACLIdentityType.USER)
+        .setAclRights(acl)
+        .build();
+    if (!accessAuthorizer.checkAccess(obj, context)) {
+      LOG.warn("User {} doesn't have {} permission to access {}",
+          user.getUserName(), acl, resType);
+      throw new OzoneAclException("User " + user.getUserName() + " doesn't " +
+          "have " + acl + " permission to access " + resType,
+          ErrorCode.PERMISSION_DENIED);
+    }
+  }
+
   /**
    * Changes the owner of a volume.
    *
@@ -670,6 +752,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public void setOwner(String volume, String owner) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE_ACL, volume,
+          null, null);
+    }
     Map<String, String> auditMap = buildAuditMap(volume);
     auditMap.put(OzoneConsts.OWNER, owner);
     try {
@@ -695,6 +781,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public void setQuota(String volume, long quota) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.WRITE, volume,
+          null, null);
+    }
+
     Map<String, String> auditMap = buildAuditMap(volume);
     auditMap.put(OzoneConsts.QUOTA, String.valueOf(quota));
     try {
@@ -722,6 +813,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE,
+          ACLType.READ, volume, null, null);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(volume);
     auditMap.put(OzoneConsts.USER_ACL,
@@ -752,6 +847,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.READ, volume,
+          null, null);
+    }
+
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(volume);
     try {
@@ -780,6 +880,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void deleteVolume(String volume) throws IOException {
     try {
+      if(isAclEnabled) {
+        checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.DELETE, volume,
+            null, null);
+      }
       metrics.incNumVolumeDeletes();
       volumeManager.deleteVolume(volume);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.DELETE_VOLUME,
@@ -807,6 +911,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
       String prevKey, int maxKeys) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST, prefix,
+          null, null);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = new LinkedHashMap<>();
     auditMap.put(OzoneConsts.PREV_KEY, prevKey);
@@ -843,6 +951,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public List<OmVolumeArgs> listAllVolumes(String prefix, String prevKey, int
       maxKeys) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST, prefix,
+          null, null);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = new LinkedHashMap<>();
     auditMap.put(OzoneConsts.PREV_KEY, prevKey);
@@ -875,6 +987,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void createBucket(OmBucketInfo bucketInfo) throws IOException {
     try {
+      if(isAclEnabled) {
+        checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.CREATE,
+            bucketInfo.getVolumeName(), bucketInfo.getBucketName(), null);
+      }
       metrics.incNumBucketCreates();
       bucketManager.createBucket(bucketInfo);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.CREATE_BUCKET,
@@ -895,6 +1011,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OmBucketInfo> listBuckets(String volumeName,
       String startKey, String prefix, int maxNumOfBuckets)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.OZONE, ACLType.LIST, volumeName,
+          null, null);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(volumeName);
     auditMap.put(OzoneConsts.START_KEY, startKey);
@@ -930,6 +1050,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OmBucketInfo getBucketInfo(String volume, String bucket)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ, volume,
+          bucket, null);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(volume);
     auditMap.put(OzoneConsts.BUCKET, bucket);
@@ -959,6 +1083,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public OpenKeySession openKey(OmKeyArgs args) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
     boolean auditSuccess = true;
     try {
       metrics.incNumKeyAllocates();
@@ -980,6 +1108,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void commitKey(OmKeyArgs args, long clientID)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
     Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
         args.toAuditMap();
     auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
@@ -1009,6 +1141,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
         args.toAuditMap();
@@ -1039,6 +1175,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
     boolean auditSuccess = true;
     try {
       metrics.incNumKeyLookups();
@@ -1059,6 +1199,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   @Override
   public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
     Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
         args.toAuditMap();
     auditMap.put(OzoneConsts.TO_KEY_NAME, toKeyName);
@@ -1084,6 +1228,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void deleteKey(OmKeyArgs args) throws IOException {
     try {
+      if(isAclEnabled) {
+        checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.DELETE,
+            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+      }
       metrics.incNumKeyDeletes();
       keyManager.deleteKey(args);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.DELETE_KEY,
@@ -1100,6 +1248,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.LIST, volumeName,
+          bucketName, keyPrefix);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(volumeName);
     auditMap.put(OzoneConsts.BUCKET, bucketName);
@@ -1133,6 +1285,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void setBucketProperty(OmBucketArgs args)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), null);
+    }
     try {
       metrics.incNumBucketUpdates();
       bucketManager.setBucketProperty(args);
@@ -1155,6 +1311,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public void deleteBucket(String volume, String bucket) throws IOException {
+    checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE, volume,
+        bucket, null);
     Map<String, String> auditMap = buildAuditMap(volume);
     auditMap.put(OzoneConsts.BUCKET, bucket);
     try {
@@ -1302,6 +1460,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public void createS3Bucket(String userName, String s3BucketName)
       throws IOException {
     try {
+      if(isAclEnabled) {
+        checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.CREATE,
+            null, s3BucketName, null);
+      }
       metrics.incNumBucketCreates();
       try {
         boolean newVolumeCreate = s3BucketManager.createOzoneVolumeIfNeeded(
@@ -1331,9 +1493,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   /**
    * {@inheritDoc}
    */
-  public void deleteS3Bucket(String s3BucketName)
-      throws IOException {
+  public void deleteS3Bucket(String s3BucketName) throws IOException {
     try {
+      if(isAclEnabled) {
+        checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.DELETE, null,
+            s3BucketName, null);
+      }
       metrics.incNumBucketDeletes();
       s3BucketManager.deleteS3Bucket(s3BucketName);
       metrics.decNumBuckets();
@@ -1348,6 +1513,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   public String getOzoneBucketMapping(String s3BucketName)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.BUCKET, StoreType.S3, ACLType.READ,
+          null, s3BucketName, null);
+    }
     return s3BucketManager.getOzoneBucketMapping(s3BucketName);
   }
 
@@ -1355,6 +1524,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OmBucketInfo> listS3Buckets(String userName, String startKey,
                                           String prefix, int maxNumOfBuckets)
       throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.VOLUME, StoreType.S3, ACLType.LIST,
+          s3BucketManager.getOzoneVolumeNameForUser(userName), null, null);
+    }
     boolean auditSuccess = true;
     Map<String, String> auditMap = buildAuditMap(userName);
     auditMap.put(OzoneConsts.START_KEY, startKey);
@@ -1410,4 +1583,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return name;
     }
   }
+
+  public static  Logger getLogger() {
+    return LOG;
+  }
 }