浏览代码

HDFS-11778. Ozone: KSM: add getBucketInfo. Contributed by Nandakumar Vadivelu.

Xiaoyu Yao 8 年之前
父节点
当前提交
84294de9a2
共有 15 个文件被更改,包括 615 次插入132 次删除
  1. 46 48
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java
  2. 207 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketInfo.java
  3. 13 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
  4. 41 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
  5. 28 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
  6. 12 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
  7. 45 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
  8. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
  9. 25 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
  10. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
  11. 38 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java
  12. 25 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
  13. 33 23
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  14. 44 18
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
  15. 37 10
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

+ 46 - 48
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketArgs.java

@@ -17,18 +17,16 @@
  */
 package org.apache.hadoop.ksm.helpers;
 
-import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.proto
+    .HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.BucketInfo;
+    .KeySpaceManagerProtocolProtos.BucketArgs;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 
-import com.google.common.base.Preconditions;
-
 /**
  * A class that encapsulates Bucket Arguments.
  */
@@ -52,12 +50,12 @@ public final class KsmBucketArgs {
   /**
    * Bucket Version flag.
    */
-  private boolean isVersionEnabled;
+  private Boolean isVersionEnabled;
   /**
    * Type of storage to be used for this bucket.
    * [RAM_DISK, SSD, DISK, ARCHIVE]
    */
-  private StorageType storageType;
+  private StorageTypeProto storageType;
 
   /**
    * Private constructor, constructed via builder.
@@ -70,7 +68,7 @@ public final class KsmBucketArgs {
    */
   private KsmBucketArgs(String volumeName, String bucketName,
       List<OzoneAclInfo> addAcls, List<OzoneAclInfo> removeAcls,
-      boolean isVersionEnabled, StorageType storageType) {
+      boolean isVersionEnabled, StorageTypeProto storageType) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.addAcls = addAcls;
@@ -97,7 +95,7 @@ public final class KsmBucketArgs {
 
   /**
    * Returns the ACL's that are to be added.
-   * @return List<OzoneAcl>
+   * @return List<OzoneAclInfo>
    */
   public List<OzoneAclInfo> getAddAcls() {
     return addAcls;
@@ -105,7 +103,7 @@ public final class KsmBucketArgs {
 
   /**
    * Returns the ACL's that are to be removed.
-   * @return List<OzoneAcl>
+   * @return List<OzoneAclInfo>
    */
   public List<OzoneAclInfo> getRemoveAcls() {
     return removeAcls;
@@ -123,7 +121,7 @@ public final class KsmBucketArgs {
    * Returns the type of storage to be used.
    * @return StorageType
    */
-  public StorageType getStorageType() {
+  public StorageTypeProto getStorageType() {
     return storageType;
   }
 
@@ -144,13 +142,8 @@ public final class KsmBucketArgs {
     private String bucketName;
     private List<OzoneAclInfo> addAcls;
     private List<OzoneAclInfo> removeAcls;
-    private boolean isVersionEnabled;
-    private StorageType storageType;
-
-    Builder() {
-      addAcls = new LinkedList<>();
-      removeAcls = new LinkedList<>();
-    }
+    private Boolean isVersionEnabled;
+    private StorageTypeProto storageType;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -162,22 +155,22 @@ public final class KsmBucketArgs {
       return this;
     }
 
-    public Builder addAddAcl(OzoneAclInfo acl) {
-      this.addAcls.add(acl);
+    public Builder setAddAcls(List<OzoneAclInfo> acls) {
+      this.addAcls = acls;
       return this;
     }
 
-    public Builder addRemoveAcl(OzoneAclInfo acl) {
-      this.removeAcls.add(acl);
+    public Builder setRemoveAcls(List<OzoneAclInfo> acls) {
+      this.removeAcls = acls;
       return this;
     }
 
-    public Builder setIsVersionEnabled(boolean versionFlag) {
+    public Builder setIsVersionEnabled(Boolean versionFlag) {
       this.isVersionEnabled = versionFlag;
       return this;
     }
 
-    public Builder setStorageType(StorageType storage) {
+    public Builder setStorageType(StorageTypeProto storage) {
       this.storageType = storage;
       return this;
     }
@@ -189,39 +182,44 @@ public final class KsmBucketArgs {
     public KsmBucketArgs build() {
       Preconditions.checkNotNull(volumeName);
       Preconditions.checkNotNull(bucketName);
-      Preconditions.checkNotNull(isVersionEnabled);
-      return new KsmBucketArgs(volumeName, bucketName, addAcls, removeAcls,
-          isVersionEnabled, storageType);
+      return new KsmBucketArgs(volumeName, bucketName, addAcls,
+          removeAcls, isVersionEnabled, storageType);
     }
   }
 
   /**
-   * Creates BucketInfo protobuf from KsmBucketArgs.
+   * Creates BucketArgs protobuf from KsmBucketArgs.
    */
-  public BucketInfo getProtobuf() {
-    return BucketInfo.newBuilder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .addAllAddAcls(addAcls)
-        .addAllRemoveAcls(removeAcls)
-        .setIsVersionEnabled(isVersionEnabled)
-        .setStorageType(PBHelperClient.convertStorageType(storageType))
-        .build();
+  public BucketArgs getProtobuf() {
+    BucketArgs.Builder builder = BucketArgs.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName);
+    if(addAcls != null && !addAcls.isEmpty()) {
+      builder.addAllAddAcls(addAcls);
+    }
+    if(removeAcls != null && !removeAcls.isEmpty()) {
+      builder.addAllRemoveAcls(removeAcls);
+    }
+    if(isVersionEnabled != null) {
+      builder.setIsVersionEnabled(isVersionEnabled);
+    }
+    if(storageType != null) {
+      builder.setStorageType(storageType);
+    }
+    return builder.build();
   }
 
   /**
    * Parses BucketInfo protobuf and creates KsmBucketArgs.
-   * @param bucketInfo
+   * @param bucketArgs
    * @return instance of KsmBucketArgs
    */
-  public static KsmBucketArgs getFromProtobuf(BucketInfo bucketInfo) {
-    return new KsmBucketArgs(
-        bucketInfo.getVolumeName(),
-        bucketInfo.getBucketName(),
-        bucketInfo.getAddAclsList(),
-        bucketInfo.getRemoveAclsList(),
-        bucketInfo.getIsVersionEnabled(),
-        PBHelperClient.convertStorageType(
-            bucketInfo.getStorageType()));
+  public static KsmBucketArgs getFromProtobuf(BucketArgs bucketArgs) {
+    return new KsmBucketArgs(bucketArgs.getVolumeName(),
+        bucketArgs.getBucketName(),
+        bucketArgs.getAddAclsList(),
+        bucketArgs.getRemoveAclsList(),
+        bucketArgs.getIsVersionEnabled(),
+        bucketArgs.getStorageType());
   }
 }

+ 207 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmBucketInfo.java

@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.proto
+    .HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A class that encapsulates Bucket Info.
+ */
+public final class KsmBucketInfo {
+  /**
+   * Name of the volume in which the bucket belongs to.
+   */
+  private final String volumeName;
+  /**
+   * Name of the bucket.
+   */
+  private final String bucketName;
+  /**
+   * ACL Information.
+   */
+  private List<OzoneAclInfo> acls;
+  /**
+   * Bucket Version flag.
+   */
+  private Boolean isVersionEnabled;
+  /**
+   * Type of storage to be used for this bucket.
+   * [RAM_DISK, SSD, DISK, ARCHIVE]
+   */
+  private StorageTypeProto storageType;
+
+  /**
+   * Private constructor, constructed via builder.
+   * @param volumeName - Volume name.
+   * @param bucketName - Bucket name.
+   * @param acls - list of ACLs.
+   * @param isVersionEnabled - Bucket version flag.
+   * @param storageType - Storage type to be used.
+   */
+  private KsmBucketInfo(String volumeName, String bucketName,
+                        List<OzoneAclInfo> acls, boolean isVersionEnabled,
+                        StorageTypeProto storageType) {
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+    this.acls = acls;
+    this.isVersionEnabled = isVersionEnabled;
+    this.storageType = storageType;
+  }
+
+  /**
+   * Returns the Volume Name.
+   * @return String.
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns the Bucket Name.
+   * @return String
+   */
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  /**
+   * Returns the ACL's associated with this bucket.
+   * @return List<OzoneAclInfo>
+   */
+  public List<OzoneAclInfo> getAcls() {
+    return acls;
+  }
+
+  /**
+   * Returns true if bucket version is enabled, else false.
+   * @return isVersionEnabled
+   */
+  public boolean getIsVersionEnabled() {
+    return isVersionEnabled;
+  }
+
+  /**
+   * Returns the type of storage to be used.
+   * @return StorageTypeProto
+   */
+  public StorageTypeProto getStorageType() {
+    return storageType;
+  }
+
+  /**
+   * Returns new builder class that builds a KsmBucketInfo.
+   *
+   * @return Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for KsmBucketInfo.
+   */
+  public static class Builder {
+    private String volumeName;
+    private String bucketName;
+    private List<OzoneAclInfo> acls;
+    private Boolean isVersionEnabled;
+    private StorageTypeProto storageType;
+
+    Builder() {
+      //Default values
+      this.acls = new LinkedList<>();
+      this.isVersionEnabled = false;
+      this.storageType = StorageTypeProto.DISK;
+    }
+
+    public Builder setVolumeName(String volume) {
+      this.volumeName = volume;
+      return this;
+    }
+
+    public Builder setBucketName(String bucket) {
+      this.bucketName = bucket;
+      return this;
+    }
+
+    public Builder setAcls(List<OzoneAclInfo> listOfAcls) {
+      this.acls = listOfAcls;
+      return this;
+    }
+
+    public Builder setIsVersionEnabled(Boolean versionFlag) {
+      this.isVersionEnabled = versionFlag;
+      return this;
+    }
+
+    public Builder setStorageType(StorageTypeProto storage) {
+      this.storageType = storage;
+      return this;
+    }
+
+    /**
+     * Constructs the KsmBucketInfo.
+     * @return instance of KsmBucketInfo.
+     */
+    public KsmBucketInfo build() {
+      Preconditions.checkNotNull(volumeName);
+      Preconditions.checkNotNull(bucketName);
+      Preconditions.checkNotNull(acls);
+      Preconditions.checkNotNull(isVersionEnabled);
+      Preconditions.checkNotNull(storageType);
+      return new KsmBucketInfo(volumeName, bucketName, acls,
+          isVersionEnabled, storageType);
+    }
+  }
+
+  /**
+   * Creates BucketInfo protobuf from KsmBucketInfo.
+   */
+  public BucketInfo getProtobuf() {
+    return BucketInfo.newBuilder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .addAllAcls(acls)
+        .setIsVersionEnabled(isVersionEnabled)
+        .setStorageType(storageType)
+        .build();
+  }
+
+  /**
+   * Parses BucketInfo protobuf and creates KsmBucketInfo.
+   * @param bucketInfo
+   * @return instance of KsmBucketInfo
+   */
+  public static KsmBucketInfo getFromProtobuf(BucketInfo bucketInfo) {
+    return new KsmBucketInfo(
+        bucketInfo.getVolumeName(),
+        bucketInfo.getBucketName(),
+        bucketInfo.getAclsList(),
+        bucketInfo.getIsVersionEnabled(),
+        bucketInfo.getStorageType());
+  }
+}

+ 13 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java

@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.ksm.protocol;
 
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import java.io.IOException;
 import java.util.List;
@@ -98,9 +98,19 @@ public interface KeySpaceManagerProtocol {
 
   /**
    * Creates a bucket.
-   * @param args - Arguments to create Bucket.
+   * @param bucketInfo - BucketInfo to create Bucket.
    * @throws IOException
    */
-  void createBucket(KsmBucketArgs args) throws IOException;
+  void createBucket(KsmBucketInfo bucketInfo) throws IOException;
+
+  /**
+   * Gets the bucket information.
+   * @param volumeName - Volume name.
+   * @param bucketName - Bucket name.
+   * @return KsmBucketInfo or exception is thrown.
+   * @throws IOException
+   */
+  KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
+      throws IOException;
 
 }

+ 41 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java

@@ -22,7 +22,7 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ozone.protocol.proto
@@ -31,6 +31,10 @@ import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateBucketRequest;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateBucketResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoBucketResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -254,15 +258,15 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
   /**
    * Creates a bucket.
    *
-   * @param args - Arguments to create Bucket.
+   * @param bucketInfo - BucketInfo to create bucket.
    * @throws IOException
    */
   @Override
-  public void createBucket(KsmBucketArgs args) throws IOException {
+  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
     CreateBucketRequest.Builder req =
         CreateBucketRequest.newBuilder();
-    BucketInfo bucketInfo = args.getProtobuf();
-    req.setBucketInfo(bucketInfo);
+    BucketInfo bucketInfoProtobuf = bucketInfo.getProtobuf();
+    req.setBucketInfo(bucketInfoProtobuf);
 
     final CreateBucketResponse resp;
     try {
@@ -277,6 +281,38 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
     }
   }
 
+  /**
+   * Gets the bucket information.
+   *
+   * @param volume - Volume name.
+   * @param bucket - Bucket name.
+   * @return KsmBucketInfo or exception is thrown.
+   * @throws IOException
+   */
+  @Override
+  public KsmBucketInfo getBucketInfo(String volume, String bucket)
+      throws IOException {
+    InfoBucketRequest.Builder req =
+        InfoBucketRequest.newBuilder();
+    req.setVolumeName(volume);
+    req.setBucketName(bucket);
+
+    final InfoBucketResponse resp;
+    try {
+      resp = rpcProxy.infoBucket(NULL_RPC_CONTROLLER,
+          req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() == Status.OK) {
+      return KsmBucketInfo.getFromProtobuf(resp.getBucketInfo());
+    } else {
+      throw new IOException("Info Bucket failed, error: "
+          + resp.getStatus());
+    }
+  }
+
+
   /**
    * Return the proxy object underlying this protocol translator.
    *

+ 28 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto

@@ -159,12 +159,20 @@ message ListVolumeResponse {
 }
 
 message BucketInfo {
+    required string volumeName = 1;
+    required string bucketName = 2;
+    repeated OzoneAclInfo acls = 3;
+    required bool isVersionEnabled = 4 [default = false];
+    required StorageTypeProto storageType = 5 [default = DISK];
+}
+
+message BucketArgs {
     required string volumeName = 1;
     required string bucketName = 2;
     repeated OzoneAclInfo addAcls = 3;
     repeated OzoneAclInfo removeAcls = 4;
-    required bool isVersionEnabled = 5 [default = false];
-    optional StorageTypeProto storageType = 6 [default = DISK];
+    optional bool isVersionEnabled = 5;
+    optional StorageTypeProto storageType = 6;
 }
 
 message OzoneAclInfo {
@@ -190,6 +198,18 @@ message CreateBucketRequest {
 message CreateBucketResponse {
     required Status status = 1;
 }
+
+message InfoBucketRequest {
+    required string volumeName = 1;
+    required string bucketName = 2;
+}
+
+message InfoBucketResponse {
+    required Status status = 1;
+    optional BucketInfo bucketInfo = 2;
+
+}
+
 /**
  The KSM service that takes care of Ozone namespace.
 */
@@ -235,4 +255,10 @@ service KeySpaceManagerService {
     */
     rpc createBucket(CreateBucketRequest)
         returns(CreateBucketResponse);
+
+    /**
+        Get Bucket information.
+    */
+    rpc infoBucket(InfoBucketRequest)
+        returns(InfoBucketResponse);
 }

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java

@@ -16,8 +16,9 @@
  */
 package org.apache.hadoop.ozone.ksm;
 
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+
+import java.io.IOException;
 
 /**
  * BucketManager handles all the bucket level operations.
@@ -25,7 +26,14 @@ import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 public interface BucketManager {
   /**
    * Creates a bucket.
-   * @param args - KsmBucketArgs for creating bucket.
+   * @param bucketInfo - KsmBucketInfo for creating bucket.
+   */
+  void createBucket(KsmBucketInfo bucketInfo) throws IOException;
+  /**
+   * Returns Bucket Information.
+   * @param volumeName - Name of the Volume.
+   * @param bucketName - Name of the Bucket.
    */
-  void createBucket(KsmBucketArgs args) throws KSMException;
+  KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
+      throws IOException;
 }

+ 45 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java

@@ -17,12 +17,16 @@
 package org.apache.hadoop.ozone.ksm;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.BucketInfo;
 import org.iq80.leveldb.DBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  * KSM bucket manager.
  */
@@ -64,16 +68,15 @@ public class BucketManagerImpl implements BucketManager {
 
   /**
    * Creates a bucket.
-   * @param args - KsmBucketArgs.
+   * @param bucketInfo - KsmBucketInfo.
    */
   @Override
-  public void createBucket(KsmBucketArgs args) throws KSMException {
-    Preconditions.checkNotNull(args);
+  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
+    Preconditions.checkNotNull(bucketInfo);
     metadataManager.writeLock().lock();
-    String volumeName = args.getVolumeName();
-    String bucketName = args.getBucketName();
+    String volumeName = bucketInfo.getVolumeName();
+    String bucketName = bucketInfo.getBucketName();
     try {
-      //bucket key: {volume/bucket}
       byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
       byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
 
@@ -89,16 +92,46 @@ public class BucketManagerImpl implements BucketManager {
         throw new KSMException("Bucket already exist",
             KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
       }
-      metadataManager.put(bucketKey, args.getProtobuf().toByteArray());
+      metadataManager.put(bucketKey, bucketInfo.getProtobuf().toByteArray());
 
       LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
-    } catch (DBException ex) {
+    } catch (IOException | DBException ex) {
       LOG.error("Bucket creation failed for bucket:{} in volume:{}",
           bucketName, volumeName, ex);
-      throw new KSMException(ex.getMessage(),
-          KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
+      throw ex;
     } finally {
       metadataManager.writeLock().unlock();
     }
   }
-}
+
+  /**
+   * Returns Bucket Information.
+   *
+   * @param volumeName - Name of the Volume.
+   * @param bucketName - Name of the Bucket.
+   */
+  @Override
+  public KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    metadataManager.readLock().lock();
+    try {
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      byte[] value = metadataManager.get(bucketKey);
+      if(value == null) {
+        LOG.error("bucket: {} not found in volume: {}.",
+            bucketName, volumeName);
+        throw new KSMException("Bucket not found",
+            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      }
+      return KsmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value));
+    } catch (IOException | DBException ex) {
+      LOG.error("Exception while getting bucket info for bucket: {}",
+          bucketName, ex);
+      throw ex;
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+}

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java

@@ -32,12 +32,14 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeModifies;
   private @Metric MutableCounterLong numVolumeInfos;
   private @Metric MutableCounterLong numBucketCreates;
+  private @Metric MutableCounterLong numBucketInfos;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
   private @Metric MutableCounterLong numVolumeModifyFails;
   private @Metric MutableCounterLong numVolumeInfoFails;
   private @Metric MutableCounterLong numBucketCreateFails;
+  private @Metric MutableCounterLong numBucketInfoFails;
 
   public KSMMetrics() {
   }
@@ -65,6 +67,10 @@ public class KSMMetrics {
     numBucketCreates.incr();
   }
 
+  public void incNumBucketInfos() {
+    numBucketInfos.incr();
+  }
+
   public void incNumVolumeCreateFails() {
     numVolumeCreates.incr();
   }
@@ -81,6 +87,10 @@ public class KSMMetrics {
     numBucketCreateFails.incr();
   }
 
+  public void incNumBucketInfoFails() {
+    numBucketInfoFails.incr();
+  }
+
   @VisibleForTesting
   public long getNumVolumeCreates() {
     return numVolumeCreates.value();
@@ -101,6 +111,11 @@ public class KSMMetrics {
     return numBucketCreates.value();
   }
 
+  @VisibleForTesting
+  public long getNumBucketInfos() {
+    return numBucketInfos.value();
+  }
+
   @VisibleForTesting
   public long getNumVolumeCreateFails() {
     return numVolumeCreateFails.value();
@@ -121,4 +136,9 @@ public class KSMMetrics {
     return numBucketCreateFails.value();
   }
 
+  @VisibleForTesting
+  public long getNumBucketInfoFails() {
+    return numBucketInfoFails.value();
+  }
+
 }

+ 25 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java

@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
@@ -339,17 +339,38 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
   /**
    * Creates a bucket.
    *
-   * @param args - Arguments to create Bucket.
+   * @param bucketInfo - BucketInfo to create bucket.
    * @throws IOException
    */
   @Override
-  public void createBucket(KsmBucketArgs args) throws IOException {
+  public void createBucket(KsmBucketInfo bucketInfo) throws IOException {
     try {
       metrics.incNumBucketCreates();
-      bucketManager.createBucket(args);
+      bucketManager.createBucket(bucketInfo);
     } catch (Exception ex) {
       metrics.incNumBucketCreateFails();
       throw ex;
     }
   }
+
+  /**
+   * Gets the bucket information.
+   *
+   * @param volume - Volume name.
+   * @param bucket - Bucket name.
+   * @return KsmBucketInfo or exception is thrown.
+   * @throws IOException
+   */
+  @Override
+  public KsmBucketInfo getBucketInfo(String volume, String bucket)
+      throws IOException {
+    try {
+      metrics.incNumBucketInfos();
+      return bucketManager.getBucketInfo(volume, bucket);
+    } catch (Exception ex) {
+      metrics.incNumBucketInfoFails();
+      throw ex;
+    }
+  }
+
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java

@@ -101,6 +101,7 @@ public class KSMException extends IOException {
     FAILED_VOLUME_NOT_FOUND,
     FAILED_USER_NOT_FOUND,
     FAILED_BUCKET_ALREADY_EXISTS,
+    FAILED_BUCKET_NOT_FOUND,
     FAILED_INTERNAL_ERROR
   }
 }

+ 38 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KSMPBHelper.java

@@ -35,7 +35,7 @@ public final class KSMPBHelper {
   }
 
   /**
-   * Returns protobuf's OzoneAclInfo of the current instance.
+   * Converts OzoneAcl into protobuf's OzoneAclInfo.
    * @return OzoneAclInfo
    */
   public static OzoneAclInfo convertOzoneAcl(OzoneAcl acl) {
@@ -73,4 +73,41 @@ public final class KSMPBHelper {
         .setRights(aclRights)
         .build();
   }
+
+  /**
+   * Converts protobuf's OzoneAclInfo into OzoneAcl.
+   * @return OzoneAcl
+   */
+  public static OzoneAcl convertOzoneAcl(OzoneAclInfo aclInfo) {
+    OzoneAcl.OzoneACLType aclType;
+    switch(aclInfo.getType()) {
+    case USER:
+      aclType = OzoneAcl.OzoneACLType.USER;
+      break;
+    case GROUP:
+      aclType = OzoneAcl.OzoneACLType.GROUP;
+      break;
+    case WORLD:
+      aclType = OzoneAcl.OzoneACLType.WORLD;
+      break;
+    default:
+      throw new IllegalArgumentException("ACL type is not recognized");
+    }
+    OzoneAcl.OzoneACLRights aclRights;
+    switch(aclInfo.getRights()) {
+    case READ:
+      aclRights = OzoneAcl.OzoneACLRights.READ;
+      break;
+    case WRITE:
+      aclRights = OzoneAcl.OzoneACLRights.WRITE;
+      break;
+    case READ_WRITE:
+      aclRights = OzoneAcl.OzoneACLRights.READ_WRITE;
+      break;
+    default:
+      throw new IllegalArgumentException("ACL right is not recognized");
+    }
+
+    return new OzoneAcl(aclType, aclInfo.getName(), aclRights);
+  }
 }

+ 25 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java

@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.protocolPB;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
@@ -27,6 +27,10 @@ import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateBucketRequest;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateBucketResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoBucketResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -91,6 +95,8 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
         return Status.USER_NOT_FOUND;
       case FAILED_BUCKET_ALREADY_EXISTS:
         return Status.BUCKET_ALREADY_EXISTS;
+      case FAILED_BUCKET_NOT_FOUND:
+        return Status.BUCKET_NOT_FOUND;
       default:
         return Status.INTERNAL_ERROR;
       }
@@ -180,7 +186,7 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     CreateBucketResponse.Builder resp =
         CreateBucketResponse.newBuilder();
     try {
-      impl.createBucket(KsmBucketArgs.getFromProtobuf(
+      impl.createBucket(KsmBucketInfo.getFromProtobuf(
           request.getBucketInfo()));
       resp.setStatus(Status.OK);
     } catch (IOException e) {
@@ -188,4 +194,21 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     }
     return resp.build();
   }
+
+  @Override
+  public InfoBucketResponse infoBucket(
+      RpcController controller, InfoBucketRequest request)
+      throws ServiceException {
+    InfoBucketResponse.Builder resp =
+        InfoBucketResponse.newBuilder();
+    try {
+      KsmBucketInfo ksmBucketInfo = impl.getBucketInfo(
+          request.getVolumeName(), request.getBucketName());
+      resp.setStatus(Status.OK);
+      resp.setBucketInfo(ksmBucketInfo.getProtobuf());
+    } catch(IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
+  }
 }

+ 33 - 23
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -25,9 +25,10 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto
 import org.apache.hadoop.hdfs.ozone.protocol.proto
     .ContainerProtos.KeyData;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset
     .LengthInputStream;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
@@ -66,6 +67,7 @@ import java.util.Locale;
 import java.util.HashSet;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
 import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
@@ -175,16 +177,21 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public void createBucket(final BucketArgs args)
       throws IOException, OzoneException {
-    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
-    args.getAddAcls().forEach(acl ->
-        builder.addAddAcl(KSMPBHelper.convertOzoneAcl(acl)));
-    args.getRemoveAcls().forEach(acl ->
-        builder.addRemoveAcl(KSMPBHelper.convertOzoneAcl(acl)));
+    KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder();
     builder.setVolumeName(args.getVolumeName())
-        .setBucketName(args.getBucketName())
-        .setIsVersionEnabled(getBucketVersioningProtobuf(
-            args.getVersioning()))
-        .setStorageType(args.getStorageType());
+        .setBucketName(args.getBucketName());
+    if(args.getAddAcls() != null) {
+      builder.setAcls(args.getAddAcls().stream().map(
+          KSMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
+    }
+    if(args.getStorageType() != null) {
+      builder.setStorageType(PBHelperClient.convertStorageType(
+          args.getStorageType()));
+    }
+    if(args.getVersioning() != null) {
+      builder.setIsVersionEnabled(getBucketVersioningProtobuf(
+          args.getVersioning()));
+    }
     keySpaceManagerClient.createBucket(builder.build());
   }
 
@@ -250,20 +257,23 @@ public final class DistributedStorageHandler implements StorageHandler {
 
   @Override
   public BucketInfo getBucketInfo(BucketArgs args)
-      throws IOException, OzoneException {
-    String containerKey = buildContainerKey(args.getVolumeName(),
-        args.getBucketName());
-    XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
-    try {
-      KeyData containerKeyData = containerKeyDataForRead(
-          xceiverClient.getPipeline().getContainerName(), containerKey);
-      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args.getRequestID());
-      return fromContainerKeyValueListToBucket(
-          response.getKeyData().getMetadataList());
-    } finally {
-      xceiverClientManager.releaseClient(xceiverClient);
+      throws IOException {
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    KsmBucketInfo ksmBucketInfo = keySpaceManagerClient.getBucketInfo(
+        volumeName, bucketName);
+    BucketInfo bucketInfo = new BucketInfo(ksmBucketInfo.getVolumeName(),
+        ksmBucketInfo.getBucketName());
+    if(ksmBucketInfo.getIsVersionEnabled()) {
+      bucketInfo.setVersioning(Versioning.ENABLED);
+    } else {
+      bucketInfo.setVersioning(Versioning.DISABLED);
     }
+    bucketInfo.setStorageType(PBHelperClient.convertStorageType(
+        ksmBucketInfo.getStorageType()));
+    bucketInfo.setAcls(ksmBucketInfo.getAcls().stream().map(
+        KSMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
+    return bucketInfo;
   }
 
   @Override

+ 44 - 18
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java

@@ -16,9 +16,9 @@
  */
 package org.apache.hadoop.ozone.ksm;
 
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.ksm.exceptions
@@ -100,13 +100,11 @@ public class TestBucketManagerImpl {
     MetadataManager metaMgr = getMetadataManagerMock();
     try {
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
-      KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+      KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
           .setVolumeName("sampleVol")
           .setBucketName("bucketOne")
-          .setStorageType(StorageType.DISK)
-          .setIsVersionEnabled(false)
           .build();
-      bucketManager.createBucket(bucketArgs);
+      bucketManager.createBucket(bucketInfo);
     } catch(KSMException ksmEx) {
       Assert.assertEquals(ResultCodes.FAILED_VOLUME_NOT_FOUND,
           ksmEx.getResult());
@@ -118,16 +116,12 @@ public class TestBucketManagerImpl {
   public void testCreateBucket() throws IOException {
     MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     BucketManager bucketManager = new BucketManagerImpl(metaMgr);
-    KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+    KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
         .setVolumeName("sampleVol")
         .setBucketName("bucketOne")
-        .setStorageType(StorageType.DISK)
-        .setIsVersionEnabled(false)
         .build();
-    bucketManager.createBucket(bucketArgs);
-    //TODO: Use BucketManagerImpl#getBucketInfo to verify creation of bucket.
-    Assert.assertNotNull(metaMgr
-        .get(DFSUtil.string2Bytes("/sampleVol/bucketOne")));
+    bucketManager.createBucket(bucketInfo);
+    Assert.assertNotNull(bucketManager.getBucketInfo("sampleVol", "bucketOne"));
   }
 
   @Test
@@ -136,18 +130,50 @@ public class TestBucketManagerImpl {
     MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
     try {
       BucketManager bucketManager = new BucketManagerImpl(metaMgr);
-      KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
+      KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
           .setVolumeName("sampleVol")
           .setBucketName("bucketOne")
-          .setStorageType(StorageType.DISK)
-          .setIsVersionEnabled(false)
           .build();
-      bucketManager.createBucket(bucketArgs);
-      bucketManager.createBucket(bucketArgs);
+      bucketManager.createBucket(bucketInfo);
+      bucketManager.createBucket(bucketInfo);
     } catch(KSMException ksmEx) {
       Assert.assertEquals(ResultCodes.FAILED_BUCKET_ALREADY_EXISTS,
           ksmEx.getResult());
       throw ksmEx;
     }
   }
+
+  @Test
+  public void testGetBucketInfoForInvalidBucket() throws IOException {
+    thrown.expectMessage("Bucket not found");
+    try {
+      MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+      BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+      bucketManager.getBucketInfo("sampleVol", "bucketOne");
+    } catch(KSMException ksmEx) {
+      Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_FOUND,
+          ksmEx.getResult());
+      throw ksmEx;
+    }
+  }
+
+  @Test
+  public void testGetBucketInfo() throws IOException {
+    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+    KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
+        .setVolumeName("sampleVol")
+        .setBucketName("bucketOne")
+        .setStorageType(HdfsProtos.StorageTypeProto.DISK)
+        .setIsVersionEnabled(false)
+        .build();
+    bucketManager.createBucket(bucketInfo);
+    KsmBucketInfo result = bucketManager.getBucketInfo(
+        "sampleVol", "bucketOne");
+    Assert.assertEquals("sampleVol", result.getVolumeName());
+    Assert.assertEquals("bucketOne", result.getBucketName());
+    Assert.assertEquals(HdfsProtos.StorageTypeProto.DISK,
+        result.getStorageType());
+    Assert.assertEquals(false, result.getIsVersionEnabled());
+  }
 }

+ 37 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -24,10 +24,12 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
 import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
+import org.apache.hadoop.ozone.web.response.BucketInfo;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -44,7 +46,7 @@ import java.util.Random;
 public class TestKeySpaceManager {
   private static MiniOzoneCluster cluster = null;
   private static StorageHandler storageHandler;
-  private static UserArgs volUserArgs;
+  private static UserArgs userArgs;
   private static KSMMetrics ksmMetrics;
 
   /**
@@ -63,7 +65,7 @@ public class TestKeySpaceManager {
     cluster = new MiniOzoneCluster.Builder(conf)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
     storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    volUserArgs = new UserArgs(null, null, null, null, null, null);
+    userArgs = new UserArgs(null, null, null, null, null, null);
     ksmMetrics = cluster.getKeySpaceManager().getMetrics();
   }
 
@@ -84,12 +86,12 @@ public class TestKeySpaceManager {
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
 
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
     createVolumeArgs.setUserName(userName);
     createVolumeArgs.setAdminName(adminName);
     storageHandler.createVolume(createVolumeArgs);
 
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
     VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
     Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
@@ -103,7 +105,7 @@ public class TestKeySpaceManager {
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
 
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
     createVolumeArgs.setUserName(userName);
     createVolumeArgs.setAdminName(adminName);
     storageHandler.createVolume(createVolumeArgs);
@@ -112,7 +114,7 @@ public class TestKeySpaceManager {
     createVolumeArgs.setUserName(newUserName);
     storageHandler.setVolumeOwner(createVolumeArgs);
 
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
     VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
 
     Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
@@ -133,13 +135,13 @@ public class TestKeySpaceManager {
     // Create a new volume with a quota
     OzoneQuota createQuota =
         new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
     createVolumeArgs.setUserName(userName);
     createVolumeArgs.setAdminName(adminName);
     createVolumeArgs.setQuota(createQuota);
     storageHandler.createVolume(createVolumeArgs);
 
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
     VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
                                               createQuota.sizeInBytes());
@@ -149,18 +151,43 @@ public class TestKeySpaceManager {
         new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
     createVolumeArgs.setQuota(setQuota);
     storageHandler.setVolumeQuota(createVolumeArgs, false);
-    getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    getVolumeArgs = new VolumeArgs(volumeName, userArgs);
     retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
                                                 setQuota.sizeInBytes());
 
     // Remove the quota and test it again
     storageHandler.setVolumeQuota(createVolumeArgs, true);
-    getVolumeArgs = new VolumeArgs(volumeName, volUserArgs);
+    getVolumeArgs = new VolumeArgs(volumeName, userArgs);
     retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
         OzoneConsts.MAX_QUOTA_IN_BYTES);
     Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
     Assert.assertEquals(0, ksmMetrics.getNumVolumeInfoFails());
   }
+
+  @Test(timeout = 60000)
+  public void testCreateBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails());
+    Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
+  }
 }