Browse Source

HDFS-11777. Ozone: KSM: add deleteBucket. Contributed by Nandakumar Vadivelu.

Xiaoyu Yao 8 years ago
parent
commit
2007e85d5b
14 changed files with 406 additions and 14 deletions
  1. 7 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java
  2. 27 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
  3. 17 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
  4. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java
  5. 43 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java
  6. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
  7. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
  8. 17 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java
  9. 31 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java
  10. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
  11. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
  12. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  13. 90 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java
  14. 107 10
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java

@@ -154,4 +154,11 @@ public interface KeySpaceManagerProtocol {
    */
   void deleteKey(KsmKeyArgs args) throws IOException;
 
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volume - Name of the volume.
+   * @param bucket - Name of the bucket.
+   * @throws IOException
+   */
+  void deleteBucket(String volume, String bucket) throws IOException;
 }

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java

@@ -44,6 +44,10 @@ import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetBucketPropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetBucketPropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteBucketResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -472,6 +476,29 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
     }
   }
 
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volume - Name of the volume.
+   * @param bucket - Name of the bucket.
+   * @throws IOException
+   */
+  public void deleteBucket(String volume, String bucket) throws IOException {
+    DeleteBucketRequest.Builder req = DeleteBucketRequest.newBuilder();
+    req.setVolumeName(volume);
+    req.setBucketName(bucket);
+    final DeleteBucketResponse resp;
+    try {
+      resp = rpcProxy.deleteBucket(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new
+          IOException("Delete Bucket failed, error:" + resp.getStatus());
+    }
+  }
+
+
   /**
    * Return the proxy object underlying this protocol translator.
    *

+ 17 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto

@@ -248,6 +248,16 @@ message SetBucketPropertyResponse {
     required Status status = 1;
 }
 
+message DeleteBucketRequest {
+    required string volumeName = 1;
+    required string bucketName = 2;
+}
+
+message DeleteBucketResponse {
+    required Status status = 1;
+}
+
+
 /**
  The KSM service that takes care of Ozone namespace.
 */
@@ -323,4 +333,10 @@ service KeySpaceManagerService {
     */
     rpc deleteKey(LocateKeyRequest)
         returns(LocateKeyResponse);
-}
+
+    /**
+       Deletes a bucket from volume if it is empty.
+    */
+    rpc deleteBucket(DeleteBucketRequest)
+        returns (DeleteBucketResponse);
+}

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManager.java

@@ -44,4 +44,12 @@ public interface BucketManager {
    * @throws IOException
    */
   void setBucketProperty(KsmBucketArgs args) throws IOException;
+
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volumeName - Name of the volume.
+   * @param bucketName - Name of the bucket.
+   * @throws IOException
+   */
+  void deleteBucket(String volumeName, String bucketName) throws IOException;
 }

+ 43 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/BucketManagerImpl.java

@@ -247,4 +247,47 @@ public class BucketManagerImpl implements BucketManager {
     }
     return acls;
   }
+
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volumeName - Name of the volume.
+   * @param bucketName - Name of the bucket.
+   * @throws IOException
+   */
+  public void deleteBucket(String volumeName, String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    metadataManager.writeLock().lock();
+    try {
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      //Check if volume exists
+      if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) ==
+          null) {
+        LOG.error("volume: {} not found ", volumeName);
+        throw new KSMException("Volume doesn't exist",
+            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+      //Check if bucket exist
+      if(metadataManager.get(bucketKey) == null) {
+        LOG.error("bucket: {} not found ", bucketName);
+        throw new KSMException("Bucket doesn't exist",
+            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      }
+      //Check if bucket is empty
+      if(!metadataManager.isBucketEmpty(volumeName, bucketName)) {
+        LOG.error("bucket: {} is not empty ", bucketName);
+        throw new KSMException("Bucket is not empty",
+            KSMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY);
+      }
+      metadataManager.delete(bucketKey);
+    } catch (IOException ex) {
+      LOG.error("Delete bucket failed for bucket:{} in volume:{}",
+          bucketName, volumeName, ex);
+      throw ex;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
 }

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java

@@ -36,6 +36,7 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeDeletes;
   private @Metric MutableCounterLong numBucketInfos;
   private @Metric MutableCounterLong numBucketModifies;
+  private @Metric MutableCounterLong numBucketDeletes;
   private @Metric MutableCounterLong numKeyAllocate;
   private @Metric MutableCounterLong numKeyLookup;
   private @Metric MutableCounterLong numKeyDeletes;
@@ -49,6 +50,7 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numVolumeCheckAccessFails;
   private @Metric MutableCounterLong numBucketInfoFails;
   private @Metric MutableCounterLong numBucketModifyFails;
+  private @Metric MutableCounterLong numBucketDeleteFails;
   private @Metric MutableCounterLong numKeyAllocateFails;
   private @Metric MutableCounterLong numKeyLookupFails;
   private @Metric MutableCounterLong numKeyDeleteFails;
@@ -95,6 +97,10 @@ public class KSMMetrics {
     numBucketModifies.incr();
   }
 
+  public void incNumBucketDeletes() {
+    numBucketDeletes.incr();
+  }
+
   public void incNumVolumeCreateFails() {
     numVolumeCreateFails.incr();
   }
@@ -127,6 +133,10 @@ public class KSMMetrics {
     numBucketModifyFails.incr();
   }
 
+  public void incNumBucketDeleteFails() {
+    numBucketDeleteFails.incr();
+  }
+
   public void incNumKeyAllocates() {
     numKeyAllocate.incr();
   }
@@ -191,6 +201,11 @@ public class KSMMetrics {
     return numBucketModifies.value();
   }
 
+  @VisibleForTesting
+  public long getNumBucketDeletes() {
+    return numBucketDeletes.value();
+  }
+
   @VisibleForTesting
   public long getNumVolumeCreateFails() {
     return numVolumeCreateFails.value();
@@ -231,6 +246,11 @@ public class KSMMetrics {
     return numBucketModifyFails.value();
   }
 
+  @VisibleForTesting
+  public long getNumBucketDeleteFails() {
+    return numBucketDeleteFails.value();
+  }
+
   @VisibleForTesting
   public long getNumKeyAllocates() {
     return numKeyAllocate.value();

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java

@@ -471,7 +471,23 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
       metrics.incNumBucketModifyFails();
       throw ex;
     }
+  }
+
 
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volume - Name of the volume.
+   * @param bucket - Name of the bucket.
+   * @throws IOException
+   */
+  public void deleteBucket(String volume, String bucket) throws IOException {
+    try {
+      metrics.incNumBucketDeletes();
+      bucketManager.deleteBucket(volume, bucket);
+    } catch (Exception ex) {
+      metrics.incNumBucketDeleteFails();
+      throw ex;
+    }
   }
 
 }

+ 17 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java

@@ -61,6 +61,12 @@ public interface MetadataManager {
    */
   void put(byte[] key, byte[] value);
 
+  /**
+   * Deletes a Key from Metadata DB.
+   * @param key   - key
+   */
+  void delete(byte[] key);
+
   /**
    * Performs batch Put and Delete to Metadata DB.
    * Can be used to do multiple puts and deletes atomically.
@@ -113,8 +119,18 @@ public interface MetadataManager {
   void deleteKey(byte[] key);
 
   /**
-   * Given a volume, check if it is empty, i.e there are no buckets inside it.
+   * Given a volume, check if it is empty,
+   * i.e there are no buckets inside it.
    * @param volume - Volume name
    */
   boolean isVolumeEmpty(String volume) throws IOException;
+
+  /**
+   * Given a volume/bucket, check if it is empty,
+   * i.e there are no keys inside it.
+   * @param volume - Volume name
+   * @param  bucket - Bucket name
+   * @return true if the bucket is empty
+   */
+  boolean isBucketEmpty(String volume, String bucket) throws IOException;
 }

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java

@@ -162,6 +162,14 @@ public class MetadataManagerImpl implements  MetadataManager {
     store.put(key, value);
   }
 
+  /**
+   * Deletes a Key from Metadata DB.
+   * @param key   - key
+   */
+  public void delete(byte[] key) {
+    store.delete(key);
+  }
+
   /**
    * Performs a batch Put and Delete from Metadata DB.
    * Can be used to do multiple puts and deletes atomically.
@@ -221,4 +229,27 @@ public class MetadataManagerImpl implements  MetadataManager {
       }
     }
   }
+
+  /**
+   * Given a volume/bucket, check if it is empty,
+   * i.e there are no keys inside it.
+   * @param volume - Volume name
+   * @param bucket - Bucket name
+   * @return true if the bucket is empty
+   */
+  public boolean isBucketEmpty(String volume, String bucket)
+      throws IOException {
+    try (DBIterator iterator = store.getIterator()) {
+      String keyRootName = OzoneConsts.KSM_VOLUME_PREFIX + volume
+          + OzoneConsts.KSM_BUCKET_PREFIX + bucket
+          + OzoneConsts.KSM_KEY_PREFIX;
+      byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
+      iterator.seek(keyRoot);
+      if(iterator.hasNext()) {
+        return !DFSUtil.bytes2String(iterator.next().getKey())
+            .startsWith(keyRootName);
+      }
+      return true;
+    }
+  }
 }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java

@@ -103,6 +103,7 @@ public class KSMException extends IOException {
     FAILED_USER_NOT_FOUND,
     FAILED_BUCKET_ALREADY_EXISTS,
     FAILED_BUCKET_NOT_FOUND,
+    FAILED_BUCKET_NOT_EMPTY,
     FAILED_KEY_ALREADY_EXISTS,
     FAILED_KEY_NOT_FOUND,
     FAILED_INTERNAL_ERROR

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java

@@ -38,6 +38,10 @@ import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetBucketPropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.SetBucketPropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteBucketResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -112,6 +116,8 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
         return Status.BUCKET_ALREADY_EXISTS;
       case FAILED_BUCKET_NOT_FOUND:
         return Status.BUCKET_NOT_FOUND;
+      case FAILED_BUCKET_NOT_EMPTY:
+        return Status.BUCKET_NOT_EMPTY;
       case FAILED_KEY_ALREADY_EXISTS:
         return Status.KEY_ALREADY_EXISTS;
       case FAILED_KEY_NOT_FOUND:
@@ -333,4 +339,18 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     }
     return resp.build();
   }
+
+  @Override
+  public DeleteBucketResponse deleteBucket(
+      RpcController controller, DeleteBucketRequest request)
+      throws ServiceException {
+    DeleteBucketResponse.Builder resp = DeleteBucketResponse.newBuilder();
+    resp.setStatus(Status.OK);
+    try {
+      impl.deleteBucket(request.getVolumeName(), request.getBucketName());
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
+  }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -286,7 +286,8 @@ public final class DistributedStorageHandler implements StorageHandler {
   @Override
   public void deleteBucket(BucketArgs args)
       throws IOException, OzoneException {
-    throw new UnsupportedOperationException("deleteBucket not implemented");
+    keySpaceManagerClient.deleteBucket(args.getVolumeName(),
+        args.getBucketName());
   }
 
   @Override

+ 90 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestBucketManagerImpl.java

@@ -41,6 +41,7 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -77,6 +78,27 @@ public class TestBucketManagerImpl {
                         + OzoneConsts.KSM_BUCKET_PREFIX
                         + invocation.getArguments()[1]));
 
+    Mockito.doAnswer(
+        new Answer<Boolean>() {
+          @Override
+          public Boolean answer(InvocationOnMock invocation)
+              throws Throwable {
+            String keyRootName =  OzoneConsts.KSM_VOLUME_PREFIX
+                + invocation.getArguments()[0]
+                + OzoneConsts.KSM_BUCKET_PREFIX
+                + invocation.getArguments()[1]
+                + OzoneConsts.KSM_KEY_PREFIX;
+            Iterator<String> keyIterator = metadataDB.keySet().iterator();
+            while(keyIterator.hasNext()) {
+              if(keyIterator.next().startsWith(keyRootName)) {
+                return false;
+              }
+            }
+            return true;
+          }
+        }).when(metadataManager).isBucketEmpty(any(String.class),
+        any(String.class));
+
     Mockito.doAnswer(
         new Answer<Void>() {
           @Override
@@ -93,6 +115,16 @@ public class TestBucketManagerImpl {
             metadataDB.get(DFSUtil.bytes2String(
                 (byte[])invocation.getArguments()[0]))
     );
+    Mockito.doAnswer(
+        new Answer<Void>() {
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            metadataDB.remove(DFSUtil.bytes2String(
+                (byte[])invocation.getArguments()[0]));
+            return null;
+          }
+        }).when(metadataManager).delete(any(byte[].class));
+
     for(String volumeName : volumesToCreate) {
       byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName);
       metadataDB.put(OzoneConsts.KSM_VOLUME_PREFIX + volumeName,
@@ -308,4 +340,61 @@ public class TestBucketManagerImpl {
         "sampleVol", "bucketOne");
     Assert.assertTrue(updatedResult.getIsVersionEnabled());
   }
-}
+
+  @Test
+  public void testDeleteBucket() throws IOException {
+    thrown.expectMessage("Bucket not found");
+    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+    for(int i = 0; i < 5; i++) {
+      KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
+          .setVolumeName("sampleVol")
+          .setBucketName("bucket_" + i)
+          .build();
+      bucketManager.createBucket(bucketInfo);
+    }
+    for(int i = 0; i < 5; i++) {
+      Assert.assertEquals("bucket_" + i,
+          bucketManager.getBucketInfo(
+              "sampleVol", "bucket_" + i).getBucketName());
+    }
+    try {
+      bucketManager.deleteBucket("sampleVol", "bucket_1");
+      Assert.assertNotNull(bucketManager.getBucketInfo(
+          "sampleVol", "bucket_2"));
+    } catch(IOException ex) {
+      Assert.fail(ex.getMessage());
+    }
+    try {
+      bucketManager.getBucketInfo("sampleVol", "bucket_1");
+    } catch(KSMException ksmEx) {
+      Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_FOUND,
+          ksmEx.getResult());
+      throw ksmEx;
+    }
+  }
+
+  @Test
+  public void testDeleteNonEmptyBucket() throws IOException {
+    thrown.expectMessage("Bucket is not empty");
+    MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
+    BucketManager bucketManager = new BucketManagerImpl(metaMgr);
+    KsmBucketInfo bucketInfo = KsmBucketInfo.newBuilder()
+        .setVolumeName("sampleVol")
+        .setBucketName("bucketOne")
+        .build();
+    bucketManager.createBucket(bucketInfo);
+    //Create keys in bucket
+    metaMgr.put(DFSUtil.string2Bytes("/sampleVol/bucketOne/key_one"),
+        DFSUtil.string2Bytes("value_one"));
+    metaMgr.put(DFSUtil.string2Bytes("/sampleVol/bucketOne/key_two"),
+        DFSUtil.string2Bytes("value_two"));
+    try {
+      bucketManager.deleteBucket("sampleVol", "bucketOne");
+    } catch(KSMException ksmEx) {
+      Assert.assertEquals(ResultCodes.FAILED_BUCKET_NOT_EMPTY,
+          ksmEx.getResult());
+      throw ksmEx;
+    }
+  }
+}

+ 107 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -98,6 +98,7 @@ public class TestKeySpaceManager {
   // Create a volume and test its attribute after creating them
   @Test(timeout = 60000)
   public void testCreateVolume() throws IOException, OzoneException {
+    long volumeCreateFailCount = ksmMetrics.getNumVolumeCreateFails();
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -111,12 +112,15 @@ public class TestKeySpaceManager {
     VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
     Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(volumeCreateFailCount,
+        ksmMetrics.getNumVolumeCreateFails());
   }
 
   // Create a volume and modify the volume owner and then test its attributes
   @Test(timeout = 60000)
   public void testChangeVolumeOwner() throws IOException, OzoneException {
+    long volumeCreateFailCount = ksmMetrics.getNumVolumeCreateFails();
+    long volumeInfoFailCount = ksmMetrics.getNumVolumeInfoFails();
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -136,13 +140,17 @@ public class TestKeySpaceManager {
     Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
     Assert.assertFalse(retVolumeInfo.getOwner().getName().equals(userName));
     Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(newUserName));
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeInfoFails());
+    Assert.assertEquals(volumeCreateFailCount,
+        ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(volumeInfoFailCount,
+        ksmMetrics.getNumVolumeInfoFails());
   }
 
   // Create a volume and modify the volume owner and then test its attributes
   @Test(timeout = 60000)
   public void testChangeVolumeQuota() throws IOException, OzoneException {
+    long numVolumeCreateFail = ksmMetrics.getNumVolumeCreateFails();
+    long numVolumeInfoFail = ksmMetrics.getNumVolumeInfoFails();
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -178,13 +186,16 @@ public class TestKeySpaceManager {
     retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertEquals(retVolumeInfo.getQuota().sizeInBytes(),
         OzoneConsts.MAX_QUOTA_IN_BYTES);
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeInfoFails());
+    Assert.assertEquals(numVolumeCreateFail,
+        ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(numVolumeInfoFail,
+        ksmMetrics.getNumVolumeInfoFails());
   }
 
   // Create a volume and then delete it and then check for deletion
   @Test(timeout = 60000)
   public void testDeleteVolume() throws IOException, OzoneException {
+    long volumeCreateFailCount = ksmMetrics.getNumVolumeCreateFails();
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -208,7 +219,8 @@ public class TestKeySpaceManager {
     volumeInfo = storageHandler.getVolumeInfo(volumeArgs);
     Assert.assertTrue(volumeInfo.getVolumeName().equals(volumeName1));
     Assert.assertTrue(volumeInfo.getOwner().getName().equals(userName));
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(volumeCreateFailCount,
+        ksmMetrics.getNumVolumeCreateFails());
 
     // Volume with _A should be able to delete as it is empty.
     storageHandler.deleteVolume(volumeArgs);
@@ -229,6 +241,7 @@ public class TestKeySpaceManager {
   // then delete it and then check for deletion failure
   @Test(timeout = 60000)
   public void testFailedDeleteVolume() throws IOException, OzoneException {
+    long numVolumeCreateFails = ksmMetrics.getNumVolumeCreateFails();
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -243,7 +256,8 @@ public class TestKeySpaceManager {
     VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
     Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
     Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(numVolumeCreateFails,
+        ksmMetrics.getNumVolumeCreateFails());
 
     BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
     storageHandler.createBucket(bucketArgs);
@@ -306,6 +320,9 @@ public class TestKeySpaceManager {
 
   @Test(timeout = 60000)
   public void testCreateBucket() throws IOException, OzoneException {
+    long numVolumeCreateFail = ksmMetrics.getNumVolumeCreateFails();
+    long numBucketCreateFail = ksmMetrics.getNumBucketCreateFails();
+    long numBucketInfoFail = ksmMetrics.getNumBucketInfoFails();
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -324,9 +341,89 @@ public class TestKeySpaceManager {
     BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
     Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
     Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
-    Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails());
-    Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
+    Assert.assertEquals(numVolumeCreateFail,
+        ksmMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(numBucketCreateFail,
+        ksmMetrics.getNumBucketCreateFails());
+    Assert.assertEquals(numBucketInfoFail,
+        ksmMetrics.getNumBucketInfoFails());
+  }
+
+  @Test(timeout = 60000)
+  public void testDeleteBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    storageHandler.deleteBucket(bucketArgs);
+    exception.expect(IOException.class);
+    exception.expectMessage("Info Bucket failed, error: BUCKET_NOT_FOUND");
+    storageHandler.getBucketInfo(getBucketArgs);
+  }
+
+  @Test(timeout = 60000)
+  public void testDeleteNonExistingBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    BucketArgs newBucketArgs = new BucketArgs(
+        volumeName, bucketName + "_invalid", userArgs);
+    exception.expect(IOException.class);
+    exception.expectMessage("Delete Bucket failed, error:BUCKET_NOT_FOUND");
+    storageHandler.deleteBucket(newBucketArgs);
+  }
+
+
+  @Test(timeout = 60000)
+  public void testDeleteNonEmptyBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    String dataString = RandomStringUtils.randomAscii(100);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    exception.expect(IOException.class);
+    exception.expectMessage("Delete Bucket failed, error:BUCKET_NOT_EMPTY");
+    storageHandler.deleteBucket(bucketArgs);
   }
 
   /**