Przeglądaj źródła

HDDS-1620. Implement Volume Write Requests to use Cache and DoubleBuffer. (#884)

Bharat Viswanadham 5 lat temu
rodzic
commit
88c53d516c
44 zmienionych plików z 2778 dodań i 157 usunięć
  1. 10 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java
  2. 7 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java
  3. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java
  4. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java
  5. 6 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java
  6. 9 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java
  7. 4 4
      hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java
  8. 23 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
  9. 1 1
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  10. 1 1
      hadoop-ozone/ozone-manager/pom.xml
  11. 25 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
  12. 16 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  13. 4 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
  14. 10 43
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
  15. 24 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
  16. 4 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
  17. 2 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
  18. 7 16
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
  19. 196 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
  20. 207 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
  21. 98 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeRequest.java
  22. 212 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
  23. 166 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
  24. 22 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/package-info.java
  25. 2 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketCreateResponse.java
  26. 3 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketDeleteResponse.java
  27. 3 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketSetPropertyResponse.java
  28. 17 13
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeCreateResponse.java
  29. 18 11
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeDeleteResponse.java
  30. 79 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetOwnerResponse.java
  31. 55 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetQuotaResponse.java
  32. 22 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/package-info.java
  33. 28 50
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
  34. 1 1
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
  35. 73 3
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
  36. 265 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeCreateRequest.java
  37. 222 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeDeleteRequest.java
  38. 204 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetOwnerRequest.java
  39. 195 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetQuotaRequest.java
  40. 21 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/package-info.java
  41. 125 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeCreateResponse.java
  42. 130 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeDeleteResponse.java
  43. 142 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetOwnerResponse.java
  44. 117 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetQuotaResponse.java

+ 10 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java

@@ -20,6 +20,8 @@
 package org.apache.hadoop.utils.db;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -131,6 +133,14 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
     throw new NotImplementedException("cleanupCache is not implemented");
   }
 
+  /**
+   * Return cache iterator maintained for this table.
+   */
+  default Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>>
+      cacheIterator() {
+    throw new NotImplementedException("cacheIterator is not implemented");
+  }
+
   /**
    * Class used to represent the key and value pair of a db entry.
    */

+ 7 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/TypedTable.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.utils.db;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.utils.db.cache.CacheKey;
@@ -82,7 +84,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
   @Override
   public boolean isExist(KEY key) throws IOException {
     CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
-    return (cacheValue != null && cacheValue.getValue() != null) ||
+    return (cacheValue != null && cacheValue.getCacheValue() != null) ||
         rawTable.isExist(codecRegistry.asRawData(key));
   }
 
@@ -109,7 +111,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
       return getFromTable(key);
     } else {
       // We have a value in cache, return the value.
-      return cacheValue.getValue();
+      return cacheValue.getCacheValue();
     }
   }
 
@@ -156,6 +158,9 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
     cache.put(cacheKey, cacheValue);
   }
 
+  public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> cacheIterator() {
+    return cache.iterator();
+  }
 
   @Override
   public void cleanupCache(long epoch) {

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheKey.java

@@ -33,7 +33,7 @@ public class CacheKey<KEY> {
     this.key = key;
   }
 
-  public KEY getKey() {
+  public KEY getCacheKey() {
     return key;
   }
 

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/CacheValue.java

@@ -36,7 +36,7 @@ public class CacheValue<VALUE> {
     this.epoch = epoch;
   }
 
-  public VALUE getValue() {
+  public VALUE getCacheValue() {
     return value.orNull();
   }
 

+ 6 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/PartialTableCache.java

@@ -20,6 +20,7 @@
 package org.apache.hadoop.utils.db.cache;
 
 import java.util.Iterator;
+import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -77,6 +78,11 @@ public class PartialTableCache<CACHEKEY extends CacheKey,
     return cache.size();
   }
 
+  @Override
+  public Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator() {
+    return cache.entrySet().iterator();
+  }
+
   private void evictCache(long epoch) {
     EpochEntry<CACHEKEY> currentEntry = null;
     for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();

+ 9 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/cache/TableCache.java

@@ -22,6 +22,9 @@ package org.apache.hadoop.utils.db.cache;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
+import java.util.Iterator;
+import java.util.Map;
+
 /**
  * Cache used for RocksDB tables.
  * @param <CACHEKEY>
@@ -60,4 +63,10 @@ public interface TableCache<CACHEKEY extends CacheKey,
    * @return size
    */
   int size();
+
+  /**
+   * Return an iterator for the cache.
+   * @return iterator of the underlying cache for the table.
+   */
+  Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator();
 }

+ 4 - 4
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/cache/TestPartialTableCache.java

@@ -51,7 +51,7 @@ public class TestPartialTableCache {
 
     for (int i=0; i < 10; i++) {
       Assert.assertEquals(Integer.toString(i),
-          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
     }
 
     // On a full table cache if some one calls cleanup it is a no-op.
@@ -59,7 +59,7 @@ public class TestPartialTableCache {
 
     for (int i=5; i < 10; i++) {
       Assert.assertEquals(Integer.toString(i),
-          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
     }
   }
 
@@ -95,7 +95,7 @@ public class TestPartialTableCache {
     // Check we have first 10 entries in cache.
     for (int i=1; i <= 10; i++) {
       Assert.assertEquals(Integer.toString(i),
-          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
     }
 
     int deleted = 5;
@@ -115,7 +115,7 @@ public class TestPartialTableCache {
     // Check if we have remaining entries.
     for (int i=6; i <= totalCount; i++) {
       Assert.assertEquals(Integer.toString(i),
-          tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
+          tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
     }
 
     tableCache.cleanup(10);

+ 23 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java

@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -154,6 +155,28 @@ public final class OmVolumeArgs extends WithMetadata implements Auditable {
     return auditMap;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OmVolumeArgs that = (OmVolumeArgs) o;
+    return creationTime == that.creationTime &&
+        quotaInBytes == that.quotaInBytes &&
+        Objects.equals(adminName, that.adminName) &&
+        Objects.equals(ownerName, that.ownerName) &&
+        Objects.equals(volume, that.volume);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(adminName, ownerName, volume, creationTime,
+        quotaInBytes);
+  }
+
   /**
    * Builder for OmVolumeArgs.
    */

+ 1 - 1
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -287,7 +287,7 @@ message VolumeInfo {
     optional uint64 quotaInBytes = 4;
     repeated hadoop.hdds.KeyValue metadata = 5;
     repeated OzoneAclInfo volumeAcls = 6;
-    required uint64 creationTime = 7;
+    optional uint64 creationTime = 7;
 }
 
 /**

+ 1 - 1
hadoop-ozone/ozone-manager/pom.xml

@@ -53,7 +53,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>2.2.0</version>
+      <version>2.28.2</version>
       <scope>test</scope>
     </dependency>
     <dependency>

+ 25 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java

@@ -20,7 +20,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.client.BlockID;
@@ -62,6 +64,9 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRE
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 
+import org.apache.hadoop.utils.db.TypedTable;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
 import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -451,10 +456,27 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public boolean isVolumeEmpty(String volume) throws IOException {
     String volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);
 
+      // First check in bucket table cache.
+    Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> iterator =
+        ((TypedTable< String, OmBucketInfo>) bucketTable).cacheIterator();
+    while (iterator.hasNext()) {
+      Map.Entry< CacheKey< String >, CacheValue< OmBucketInfo > > entry =
+          iterator.next();
+      String key = entry.getKey().getCacheKey();
+      OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
+      // Making sure that entry is not for delete bucket request.
+      if (key.startsWith(volumePrefix) && omBucketInfo != null) {
+        return false;
+      }
+    }
+
     try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
         bucketIter = bucketTable.iterator()) {
       KeyValue<String, OmBucketInfo> kv = bucketIter.seek(volumePrefix);
-      if (kv != null && kv.getKey().startsWith(volumePrefix)) {
+      // During iteration from DB, check in mean time if this bucket is not
+      // marked for delete.
+      if (kv != null && kv.getKey().startsWith(volumePrefix) &&
+          bucketTable.get(kv.getKey()) != null) {
         return false; // we found at least one bucket with this volume prefix.
       }
     }
@@ -473,6 +495,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public boolean isBucketEmpty(String volume, String bucket)
       throws IOException {
     String keyPrefix = getBucketKey(volume, bucket);
+    //TODO: When Key ops are converted in to HA model, use cache also to
+    // determine bucket is empty or not.
     try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
         keyTable.iterator()) {
       KeyValue<String, OmKeyInfo> kv = keyIter.seek(keyPrefix);

+ 16 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -198,6 +198,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
@@ -276,12 +278,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private static String keyProviderUriKeyName =
       CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
 
+  // Adding parameters needed for VolumeRequests here, so that during request
+  // execution, we can get from ozoneManager.
+  private long maxUserVolumeCount;
+
 
   private OzoneManager(OzoneConfiguration conf) throws IOException,
       AuthenticationException {
     super(OzoneVersionInfo.OZONE_VERSION_INFO);
     Preconditions.checkNotNull(conf);
     configuration = conf;
+    this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
+        OZONE_OM_USER_MAX_VOLUME_DEFAULT);
+    Preconditions.checkArgument(this.maxUserVolumeCount > 0,
+        OZONE_OM_USER_MAX_VOLUME + " value should be greater than zero");
     omStorage = new OMStorage(conf);
     omId = omStorage.getOmId();
     if (omStorage.getState() != StorageState.INITIALIZED) {
@@ -3201,7 +3211,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return null;
   }
 
-  public OMMetrics getOmMetrics() {
-    return metrics;
+  /**
+   * Return maximum volumes count per user.
+   * @return maxUserVolumeCount
+   */
+  public long getMaxUserVolumeCount() {
+    return maxUserVolumeCount;
   }
 }

+ 4 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java

@@ -142,11 +142,13 @@ public class OzoneManagerDoubleBuffer {
   }
 
   private void cleanupCache(long lastRatisTransactionIndex) {
-    // As now only bucket transactions are handled only called cleanupCache
-    // on bucketTable.
+    // As now only volume and bucket transactions are handled only called
+    // cleanupCache on bucketTable.
     // TODO: After supporting all write operations we need to call
     //  cleanupCache on the tables only when buffer has entries for that table.
     omMetadataManager.getBucketTable().cleanupCache(lastRatisTransactionIndex);
+    omMetadataManager.getVolumeTable().cleanupCache(lastRatisTransactionIndex);
+    omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex);
   }
 
   /**

+ 10 - 43
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java

@@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -189,53 +188,21 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   private TransactionContext handleStartTransactionRequests(
       RaftClientRequest raftClientRequest, OMRequest omRequest) {
 
-    OMRequest newOmRequest = null;
-    try {
-      switch (omRequest.getCmdType()) {
-      case CreateVolume:
-      case SetVolumeProperty:
-      case DeleteVolume:
-        newOmRequest = handler.handleStartTransaction(omRequest);
-        break;
-      case AllocateBlock:
-        return handleAllocateBlock(raftClientRequest, omRequest);
-      case CreateKey:
-        return handleCreateKeyRequest(raftClientRequest, omRequest);
-      case InitiateMultiPartUpload:
-        return handleInitiateMultipartUpload(raftClientRequest, omRequest);
-      default:
-        return TransactionContext.newBuilder()
-            .setClientRequest(raftClientRequest)
-            .setStateMachine(this)
-            .setServerRole(RaftProtos.RaftPeerRole.LEADER)
-            .setLogData(raftClientRequest.getMessage().getContent())
-            .build();
-      }
-    } catch (IOException ex) {
-      TransactionContext transactionContext = TransactionContext.newBuilder()
+    switch (omRequest.getCmdType()) {
+    case AllocateBlock:
+      return handleAllocateBlock(raftClientRequest, omRequest);
+    case CreateKey:
+      return handleCreateKeyRequest(raftClientRequest, omRequest);
+    case InitiateMultiPartUpload:
+      return handleInitiateMultipartUpload(raftClientRequest, omRequest);
+    default:
+      return TransactionContext.newBuilder()
           .setClientRequest(raftClientRequest)
           .setStateMachine(this)
           .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+          .setLogData(raftClientRequest.getMessage().getContent())
           .build();
-      if (ex instanceof OMException) {
-        IOException ioException =
-            new IOException(ex.getMessage() + STATUS_CODE +
-                ((OMException) ex).getResult());
-        transactionContext.setException(ioException);
-      } else {
-        transactionContext.setException(ex);
-      }
-      LOG.error("Exception in startTransaction for cmdType " +
-          omRequest.getCmdType(), ex);
-      return transactionContext;
     }
-    TransactionContext transactionContext = TransactionContext.newBuilder()
-        .setClientRequest(raftClientRequest)
-        .setStateMachine(this)
-        .setServerRole(RaftProtos.RaftPeerRole.LEADER)
-        .setLogData(OMRatisHelper.convertRequestToByteString(newOmRequest))
-        .build();
-    return transactionContext;
   }
 
   private TransactionContext handleInitiateMultipartUpload(

+ 24 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java

@@ -17,11 +17,16 @@
 
 package org.apache.hadoop.ozone.om.ratis.utils;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest;
 import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetPropertyRequest;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetQuotaRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
@@ -43,10 +48,27 @@ public final class OzoneManagerRatisUtils {
    * @return OMClientRequest
    * @throws IOException
    */
-  public static OMClientRequest createClientRequest(OMRequest omRequest)
-      throws IOException {
+  public static OMClientRequest createClientRequest(OMRequest omRequest) {
     Type cmdType = omRequest.getCmdType();
     switch (cmdType) {
+    case CreateVolume:
+      return new OMVolumeCreateRequest(omRequest);
+    case SetVolumeProperty:
+      boolean hasQuota = omRequest.getSetVolumePropertyRequest()
+          .hasQuotaInBytes();
+      boolean hasOwner = omRequest.getSetVolumePropertyRequest().hasOwnerName();
+      Preconditions.checkState(hasOwner || hasQuota, "Either Quota or owner " +
+          "should be set in the SetVolumeProperty request");
+      Preconditions.checkState(!(hasOwner && hasQuota), "Either Quota or " +
+          "owner should be set in the SetVolumeProperty request. Should not " +
+          "set both");
+      if (hasQuota) {
+        return new OMVolumeSetQuotaRequest(omRequest);
+      } else {
+        return new OMVolumeSetOwnerRequest(omRequest);
+      }
+    case DeleteVolume:
+      return new OMVolumeDeleteRequest(omRequest);
     case CreateBucket:
       return new OMBucketCreateRequest(omRequest);
     case DeleteBucket:

+ 4 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java

@@ -50,7 +50,7 @@ import org.apache.hadoop.security.UserGroupInformation;
  */
 public abstract class OMClientRequest implements RequestAuditor {
 
-  private final OMRequest omRequest;
+  private OMRequest omRequest;
 
   public OMClientRequest(OMRequest omRequest) {
     Preconditions.checkNotNull(omRequest);
@@ -69,7 +69,8 @@ public abstract class OMClientRequest implements RequestAuditor {
    */
   public OMRequest preExecute(OzoneManager ozoneManager)
       throws IOException {
-    return getOmRequest().toBuilder().setUserInfo(getUserInfo()).build();
+    omRequest = getOmRequest().toBuilder().setUserInfo(getUserInfo()).build();
+    return omRequest;
   }
 
   /**
@@ -210,4 +211,5 @@ public abstract class OMClientRequest implements RequestAuditor {
     auditMap.put(OzoneConsts.VOLUME, volume);
     return auditMap;
   }
+
 }

+ 2 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java

@@ -97,8 +97,9 @@ public class OMBucketCreateRequest extends OMClientRequest {
     }
 
     newCreateBucketRequest.setBucketInfo(newBucketInfo.build());
+
     return getOmRequest().toBuilder().setUserInfo(getUserInfo())
-        .setCreateBucketRequest(newCreateBucketRequest.build()).build();
+       .setCreateBucketRequest(newCreateBucketRequest.build()).build();
   }
 
   @Override

+ 7 - 16
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java

@@ -73,20 +73,15 @@ public class OMBucketSetPropertyRequest extends OMClientRequest {
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
       long transactionLogIndex) {
 
+
     SetBucketPropertyRequest setBucketPropertyRequest =
         getOmRequest().getSetBucketPropertyRequest();
 
     Preconditions.checkNotNull(setBucketPropertyRequest);
 
-    OMMetrics omMetrics = ozoneManager.getOmMetrics();
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumBucketUpdates();
 
-    // This will never be null, on a real Ozone cluster. For tests this might
-    // be null. using mockito, to set omMetrics object, but still getting
-    // null. For now added this not null check.
-    //TODO: Removed not null check from here, once tests got fixed.
-    if (omMetrics != null) {
-      omMetrics.incNumBucketUpdates();
-    }
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
@@ -113,13 +108,11 @@ public class OMBucketSetPropertyRequest extends OMClientRequest {
             volumeName, bucketName, null);
       }
     } catch (IOException ex) {
-      if (omMetrics != null) {
-        omMetrics.incNumBucketUpdateFails();
-      }
-      auditLog(auditLogger, buildAuditMessage(OMAction.UPDATE_BUCKET,
-              omBucketArgs.toAuditMap(), ex, userInfo));
       LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
           bucketName, volumeName, ex);
+      omMetrics.incNumBucketUpdateFails();
+      auditLog(auditLogger, buildAuditMessage(OMAction.UPDATE_BUCKET,
+              omBucketArgs.toAuditMap(), ex, userInfo));
       return new OMBucketSetPropertyResponse(omBucketInfo,
           createErrorOMResponse(omResponse, ex));
     }
@@ -204,11 +197,9 @@ public class OMBucketSetPropertyRequest extends OMClientRequest {
           SetBucketPropertyResponse.newBuilder().build());
       return new OMBucketSetPropertyResponse(omBucketInfo, omResponse.build());
     } else {
-      if (omMetrics != null) {
-        omMetrics.incNumBucketUpdateFails();
-      }
       LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
           bucketName, volumeName, exception);
+      omMetrics.incNumBucketUpdateFails();
       return new OMBucketSetPropertyResponse(omBucketInfo,
           createErrorOMResponse(omResponse, exception));
     }

+ 196 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java

@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Handles volume create request.
+ */
+public class OMVolumeCreateRequest extends OMClientRequest
+    implements OMVolumeRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMVolumeCreateRequest.class);
+
+  public OMVolumeCreateRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    VolumeInfo volumeInfo  =
+        getOmRequest().getCreateVolumeRequest().getVolumeInfo();
+
+    // Set creation time
+    VolumeInfo updatedVolumeInfo =
+        volumeInfo.toBuilder().setCreationTime(Time.now()).build();
+
+
+    return getOmRequest().toBuilder().setCreateVolumeRequest(
+        CreateVolumeRequest.newBuilder().setVolumeInfo(updatedVolumeInfo))
+        .setUserInfo(getUserInfo())
+        .build();
+
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+
+    CreateVolumeRequest createVolumeRequest =
+        getOmRequest().getCreateVolumeRequest();
+    Preconditions.checkNotNull(createVolumeRequest);
+    VolumeInfo volumeInfo = createVolumeRequest.getVolumeInfo();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumVolumeCreates();
+
+    String volume = volumeInfo.getVolume();
+    String owner = volumeInfo.getOwnerName();
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.CreateVolume).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+    OmVolumeArgs omVolumeArgs = null;
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+    OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
+
+    // Doing this here, so we can do protobuf conversion outside of lock.
+    try {
+      omVolumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.CREATE, volume,
+            null, null);
+      }
+    } catch (IOException ex) {
+      omMetrics.incNumVolumeCreateFails();
+      auditLog(auditLogger, buildAuditMessage(OMAction.CREATE_VOLUME,
+          buildVolumeAuditMap(volume), ex, userInfo));
+      LOG.error("Volume creation failed for user:{} volume:{}", owner, volume,
+          ex);
+      return new OMVolumeCreateResponse(omVolumeArgs, null,
+          createErrorOMResponse(omResponse, ex));
+    }
+
+
+
+    String dbUserKey = omMetadataManager.getUserKey(owner);
+    String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
+    VolumeList volumeList = null;
+
+    // acquire lock.
+    omMetadataManager.getLock().acquireUserLock(owner);
+    omMetadataManager.getLock().acquireVolumeLock(volume);
+
+    IOException exception = null;
+    try {
+      OmVolumeArgs dbVolumeArgs =
+          omMetadataManager.getVolumeTable().get(dbVolumeKey);
+
+      // Validation: Check if volume already exists
+      if (dbVolumeArgs != null) {
+        LOG.debug("volume:{} already exists", omVolumeArgs.getVolume());
+        throw new OMException("Volume already exists",
+            OMException.ResultCodes.VOLUME_ALREADY_EXISTS);
+      }
+
+      volumeList = omMetadataManager.getUserTable().get(dbUserKey);
+      volumeList = addVolumeToOwnerList(volumeList,
+          volume, owner, ozoneManager.getMaxUserVolumeCount());
+
+      // Update cache: Update user and volume cache.
+      omMetadataManager.getUserTable().addCacheEntry(new CacheKey<>(dbUserKey),
+          new CacheValue<>(Optional.of(volumeList), transactionLogIndex));
+
+      omMetadataManager.getVolumeTable().addCacheEntry(
+          new CacheKey<>(dbVolumeKey),
+          new CacheValue<>(Optional.of(omVolumeArgs), transactionLogIndex));
+
+    } catch (IOException ex) {
+      exception = ex;
+    } finally {
+      omMetadataManager.getLock().releaseVolumeLock(volumeInfo.getVolume());
+      omMetadataManager.getLock().releaseUserLock(dbUserKey);
+    }
+
+    // Performing audit logging outside of the lock.
+    auditLog(auditLogger, buildAuditMessage(OMAction.CREATE_VOLUME,
+        omVolumeArgs.toAuditMap(), exception, userInfo));
+
+    // return response after releasing lock.
+    if (exception == null) {
+      LOG.debug("created volume:{} for user:{}", omVolumeArgs.getVolume(),
+          owner);
+      omMetrics.incNumVolumes();
+      omResponse.setCreateVolumeResponse(CreateVolumeResponse.newBuilder()
+          .build());
+      return new OMVolumeCreateResponse(omVolumeArgs, volumeList,
+          omResponse.build());
+    } else {
+      LOG.error("Volume creation failed for user:{} volume:{}", owner,
+          volumeInfo.getVolume(), exception);
+      omMetrics.incNumVolumeCreateFails();
+      return new OMVolumeCreateResponse(omVolumeArgs, volumeList,
+          createErrorOMResponse(omResponse, exception));
+    }
+  }
+
+
+}

+ 207 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java

@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeDeleteResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+
+/**
+ * Handles volume delete request.
+ */
+public class OMVolumeDeleteRequest extends OMClientRequest
+    implements OMVolumeRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMVolumeDeleteRequest.class);
+
+  public OMVolumeDeleteRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+
+    DeleteVolumeRequest deleteVolumeRequest =
+        getOmRequest().getDeleteVolumeRequest();
+    Preconditions.checkNotNull(deleteVolumeRequest);
+
+    String volume = deleteVolumeRequest.getVolumeName();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumVolumeDeletes();
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.DeleteVolume).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+    OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
+
+    try {
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.DELETE, volume,
+            null, null);
+      }
+    } catch (IOException ex) {
+      LOG.error("Volume deletion failed for volume:{}", volume, ex);
+      omMetrics.incNumVolumeDeleteFails();
+      auditLog(auditLogger, buildAuditMessage(OMAction.DELETE_VOLUME,
+          buildVolumeAuditMap(volume), ex, userInfo));
+      return new OMVolumeCreateResponse(null, null,
+          createErrorOMResponse(omResponse, ex));
+    }
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    OmVolumeArgs omVolumeArgs = null;
+    String owner = null;
+
+    omMetadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      owner = getVolumeInfo(omMetadataManager, volume).getOwnerName();
+    } catch (IOException ex) {
+      LOG.error("Volume deletion failed for volume:{}", volume, ex);
+      omMetrics.incNumVolumeDeleteFails();
+      auditLog(auditLogger, buildAuditMessage(OMAction.DELETE_VOLUME,
+          buildVolumeAuditMap(volume), ex, userInfo));
+      return new OMVolumeDeleteResponse(null, null, null,
+          createErrorOMResponse(omResponse, ex));
+    } finally {
+      omMetadataManager.getLock().releaseVolumeLock(volume);
+    }
+
+    // Release and reacquire lock for now it will not be a problem for now, as
+    // applyTransaction serializes the operation's.
+    // TODO: Revisit this logic once HDDS-1672 checks in.
+
+    // We cannot acquire user lock holding volume lock, so released volume
+    // lock, and acquiring user and volume lock.
+
+    omMetadataManager.getLock().acquireUserLock(owner);
+    omMetadataManager.getLock().acquireVolumeLock(volume);
+
+    String dbUserKey = omMetadataManager.getUserKey(owner);
+    String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
+
+    IOException exception = null;
+    OzoneManagerProtocolProtos.VolumeList newVolumeList = null;
+    try {
+      if (!omMetadataManager.isVolumeEmpty(volume)) {
+        LOG.debug("volume:{} is not empty", volume);
+        throw new OMException(OMException.ResultCodes.VOLUME_NOT_EMPTY);
+      }
+
+      newVolumeList = omMetadataManager.getUserTable().get(owner);
+
+      // delete the volume from the owner list
+      // as well as delete the volume entry
+      newVolumeList = delVolumeFromOwnerList(newVolumeList, volume, owner);
+
+      omMetadataManager.getUserTable().addCacheEntry(new CacheKey<>(dbUserKey),
+          new CacheValue<>(Optional.of(newVolumeList), transactionLogIndex));
+
+      omMetadataManager.getVolumeTable().addCacheEntry(
+          new CacheKey<>(dbVolumeKey), new CacheValue<>(Optional.absent(),
+              transactionLogIndex));
+
+    } catch (IOException ex) {
+      exception = ex;
+
+    } finally {
+      omMetadataManager.getLock().releaseVolumeLock(volume);
+      omMetadataManager.getLock().releaseUserLock(owner);
+    }
+
+    // Performing audit logging outside of the lock.
+    auditLog(auditLogger, buildAuditMessage(OMAction.DELETE_VOLUME,
+        buildVolumeAuditMap(volume), exception, userInfo));
+
+    // return response after releasing lock.
+    if (exception == null) {
+      LOG.debug("Volume deleted for user:{} volume:{}", owner, volume);
+      omMetrics.decNumVolumes();
+      omResponse.setDeleteVolumeResponse(
+          DeleteVolumeResponse.newBuilder().build());
+      return new OMVolumeDeleteResponse(volume, owner, newVolumeList,
+          omResponse.build());
+    } else {
+      LOG.error("Volume deletion failed for user:{} volume:{}",
+          owner, volume, exception);
+      omMetrics.incNumVolumeDeleteFails();
+      return new OMVolumeDeleteResponse(null, null, null,
+          createErrorOMResponse(omResponse, exception));
+    }
+
+  }
+
+  /**
+   * Return volume info for the specified volume. This method should be
+   * called after acquiring volume lock.
+   * @param omMetadataManager
+   * @param volume
+   * @return OmVolumeArgs
+   * @throws IOException
+   */
+  private OmVolumeArgs getVolumeInfo(OMMetadataManager omMetadataManager,
+      String volume) throws IOException {
+
+    String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
+    OmVolumeArgs volumeArgs =
+        omMetadataManager.getVolumeTable().get(dbVolumeKey);
+    if (volumeArgs == null) {
+      throw new OMException("Volume " + volume + " is not found",
+          OMException.ResultCodes.VOLUME_NOT_FOUND);
+    }
+    return volumeArgs;
+
+  }
+}

+ 98 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeRequest.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Defines common methods required for volume requests.
+ */
+public interface OMVolumeRequest {
+
+  /**
+   * Delete volume from user volume list. This method should be called after
+   * acquiring user lock.
+   * @param volumeList - current volume list owned by user.
+   * @param volume - volume which needs to deleted from the volume list.
+   * @param owner
+   * @return VolumeList - updated volume list for the user.
+   * @throws IOException
+   */
+  default VolumeList delVolumeFromOwnerList(VolumeList volumeList,
+      String volume, String owner) throws IOException {
+
+    List<String> prevVolList = new ArrayList<>();
+
+    if (volumeList != null) {
+      prevVolList.addAll(volumeList.getVolumeNamesList());
+    } else {
+      // No Volumes for this user
+      throw new OMException("User not found: " + owner,
+          OMException.ResultCodes.USER_NOT_FOUND);
+    }
+
+    // Remove the volume from the list
+    prevVolList.remove(volume);
+    VolumeList newVolList = VolumeList.newBuilder()
+        .addAllVolumeNames(prevVolList).build();
+    return newVolList;
+  }
+
+
+  /**
+   * Add volume to user volume list. This method should be called after
+   * acquiring user lock.
+   * @param volumeList - current volume list owned by user.
+   * @param volume - volume which needs to be added to this list.
+   * @param owner
+   * @param maxUserVolumeCount
+   * @return VolumeList - which is updated volume list.
+   * @throws OMException - if user has volumes greater than
+   * maxUserVolumeCount, an exception is thrown.
+   */
+  default VolumeList addVolumeToOwnerList(
+      VolumeList volumeList, String volume, String owner,
+      long maxUserVolumeCount) throws IOException {
+
+    // Check the volume count
+    if (volumeList != null &&
+        volumeList.getVolumeNamesList().size() >= maxUserVolumeCount) {
+      throw new OMException("Too many volumes for user:" + owner,
+          OMException.ResultCodes.USER_TOO_MANY_VOLUMES);
+    }
+
+    List<String> prevVolList = new ArrayList<>();
+    if (volumeList != null) {
+      prevVolList.addAll(volumeList.getVolumeNamesList());
+    }
+
+    // Add the new volume to the list
+    prevVolList.add(volume);
+    VolumeList newVolList = VolumeList.newBuilder()
+        .addAllVolumeNames(prevVolList).build();
+
+    return newVolList;
+  }
+}

+ 212 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java

@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeSetOwnerResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+/**
+ * Handle set owner request for volume.
+ */
+public class OMVolumeSetOwnerRequest extends OMClientRequest
+    implements OMVolumeRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMVolumeSetOwnerRequest.class);
+
+  public OMVolumeSetOwnerRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+
+    SetVolumePropertyRequest setVolumePropertyRequest =
+        getOmRequest().getSetVolumePropertyRequest();
+
+    Preconditions.checkNotNull(setVolumePropertyRequest);
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.SetVolumeProperty).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+    // In production this will never happen, this request will be called only
+    // when we have ownerName in setVolumePropertyRequest.
+    if (!setVolumePropertyRequest.hasOwnerName()) {
+      omResponse.setStatus(OzoneManagerProtocolProtos.Status.INVALID_REQUEST)
+          .setSuccess(false);
+      return new OMVolumeSetOwnerResponse(null, null, null, null,
+          omResponse.build());
+    }
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumVolumeUpdates();
+    String volume = setVolumePropertyRequest.getVolumeName();
+    String newOwner = setVolumePropertyRequest.getOwnerName();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+    OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
+
+    Map<String, String> auditMap = buildVolumeAuditMap(volume);
+    auditMap.put(OzoneConsts.OWNER, newOwner);
+    try {
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE_ACL,
+            volume, null, null);
+      }
+    } catch (IOException ex) {
+      LOG.error("Changing volume ownership failed for user:{} volume:{}",
+          newOwner, volume);
+      omMetrics.incNumVolumeUpdateFails();
+      auditLog(auditLogger, buildAuditMessage(OMAction.SET_OWNER, auditMap,
+          ex, userInfo));
+      return new OMVolumeCreateResponse(null, null,
+          createErrorOMResponse(omResponse, ex));
+    }
+
+
+    long maxUserVolumeCount = ozoneManager.getMaxUserVolumeCount();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
+    String oldOwner = null;
+    OzoneManagerProtocolProtos.VolumeList oldOwnerVolumeList = null;
+    OzoneManagerProtocolProtos.VolumeList newOwnerVolumeList = null;
+    OmVolumeArgs omVolumeArgs = null;
+    IOException exception = null;
+
+    omMetadataManager.getLock().acquireUserLock(newOwner);
+    omMetadataManager.getLock().acquireVolumeLock(volume);
+
+    boolean needToreleaseOldOwnerLock = false;
+    try {
+      omVolumeArgs = omMetadataManager.getVolumeTable().get(dbVolumeKey);
+
+      if (omVolumeArgs == null) {
+        LOG.debug("Changing volume ownership failed for user:{} volume:{}",
+            newOwner, volume);
+        throw new OMException("Volume " + volume + " is not found",
+            OMException.ResultCodes.VOLUME_NOT_FOUND);
+      }
+
+      oldOwner = omVolumeArgs.getOwnerName();
+
+
+      // Release and reacquire lock for now it will not be a problem, as
+      // applyTransaction serializes the operation's.
+      // TODO: Revisit this logic once HDDS-1672 checks in.
+
+      // releasing volume lock, as to acquire user lock we need to release
+      // volume lock.
+      omMetadataManager.getLock().releaseVolumeLock(volume);
+      omMetadataManager.getLock().acquireUserLock(oldOwner);
+      omMetadataManager.getLock().acquireVolumeLock(volume);
+
+      needToreleaseOldOwnerLock = true;
+      oldOwnerVolumeList =
+          omMetadataManager.getUserTable().get(oldOwner);
+
+      oldOwnerVolumeList = delVolumeFromOwnerList(
+          oldOwnerVolumeList, volume, oldOwner);
+
+
+      newOwnerVolumeList = omMetadataManager.getUserTable().get(newOwner);
+      newOwnerVolumeList = addVolumeToOwnerList(
+          newOwnerVolumeList, volume, newOwner, maxUserVolumeCount);
+
+      // Set owner with new owner name.
+      omVolumeArgs.setOwnerName(newOwner);
+
+      // Update cache.
+      omMetadataManager.getUserTable().addCacheEntry(
+          new CacheKey<>(omMetadataManager.getUserKey(newOwner)),
+              new CacheValue<>(Optional.of(newOwnerVolumeList),
+                  transactionLogIndex));
+      omMetadataManager.getUserTable().addCacheEntry(
+          new CacheKey<>(omMetadataManager.getUserKey(oldOwner)),
+          new CacheValue<>(Optional.of(oldOwnerVolumeList),
+              transactionLogIndex));
+      omMetadataManager.getVolumeTable().addCacheEntry(
+          new CacheKey<>(dbVolumeKey),
+          new CacheValue<>(Optional.of(omVolumeArgs), transactionLogIndex));
+
+    } catch (IOException ex) {
+      exception = ex;
+    } finally {
+      omMetadataManager.getLock().releaseVolumeLock(volume);
+      omMetadataManager.getLock().releaseUserLock(newOwner);
+      if (needToreleaseOldOwnerLock) {
+        omMetadataManager.getLock().releaseUserLock(oldOwner);
+      }
+    }
+
+    // Performing audit logging outside of the lock.
+    auditLog(auditLogger, buildAuditMessage(OMAction.SET_OWNER, auditMap,
+        exception, userInfo));
+
+    // return response after releasing lock.
+    if (exception == null) {
+      LOG.debug("Successfully changed Owner of Volume {} from {} -> {}", volume,
+          oldOwner, newOwner);
+      omResponse.setSetVolumePropertyResponse(
+          SetVolumePropertyResponse.newBuilder().build());
+      return new OMVolumeSetOwnerResponse(oldOwner, oldOwnerVolumeList,
+          newOwnerVolumeList, omVolumeArgs, omResponse.build());
+    } else {
+      LOG.error("Changing volume ownership failed for user:{} volume:{}",
+          newOwner, volume, exception);
+      omMetrics.incNumVolumeUpdateFails();
+      return new OMVolumeSetOwnerResponse(null, null, null, null,
+          createErrorOMResponse(omResponse, exception));
+    }
+  }
+}

+ 166 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java

@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeSetQuotaResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyResponse;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+
+/**
+ * Handles set Quota request for volume.
+ */
+public class OMVolumeSetQuotaRequest extends OMClientRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMVolumeSetQuotaRequest.class);
+
+  public OMVolumeSetQuotaRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+
+    SetVolumePropertyRequest setVolumePropertyRequest =
+        getOmRequest().getSetVolumePropertyRequest();
+
+    Preconditions.checkNotNull(setVolumePropertyRequest);
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType(
+        OzoneManagerProtocolProtos.Type.SetVolumeProperty).setStatus(
+        OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+
+
+
+    // In production this will never happen, this request will be called only
+    // when we have quota in bytes is set in setVolumePropertyRequest.
+    if (!setVolumePropertyRequest.hasQuotaInBytes()) {
+      omResponse.setStatus(OzoneManagerProtocolProtos.Status.INVALID_REQUEST)
+          .setSuccess(false);
+      return new OMVolumeSetQuotaResponse(null,
+          omResponse.build());
+    }
+
+    String volume = setVolumePropertyRequest.getVolumeName();
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumVolumeUpdates();
+
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+    OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
+    Map<String, String> auditMap = buildVolumeAuditMap(volume);
+    auditMap.put(OzoneConsts.QUOTA,
+        String.valueOf(setVolumePropertyRequest.getQuotaInBytes()));
+
+    try {
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.VOLUME,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE, volume,
+            null, null);
+      }
+    } catch (IOException ex) {
+      LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
+          setVolumePropertyRequest.getQuotaInBytes(), ex);
+      omMetrics.incNumVolumeUpdateFails();
+      auditLog(auditLogger, buildAuditMessage(OMAction.SET_QUOTA, auditMap,
+          ex, userInfo));
+      return new OMVolumeCreateResponse(null, null,
+          createErrorOMResponse(omResponse, ex));
+    }
+
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    IOException exception = null;
+    OmVolumeArgs omVolumeArgs = null;
+
+    omMetadataManager.getLock().acquireVolumeLock(volume);
+    try {
+      String dbVolumeKey = omMetadataManager.getVolumeKey(volume);
+      omVolumeArgs = omMetadataManager.getVolumeTable().get(dbVolumeKey);
+
+      if (omVolumeArgs == null) {
+        LOG.debug("volume:{} does not exist", volume);
+        throw new OMException(OMException.ResultCodes.VOLUME_NOT_FOUND);
+      }
+
+      omVolumeArgs.setQuotaInBytes(setVolumePropertyRequest.getQuotaInBytes());
+
+      // update cache.
+      omMetadataManager.getVolumeTable().addCacheEntry(
+          new CacheKey<>(dbVolumeKey),
+          new CacheValue<>(Optional.of(omVolumeArgs), transactionLogIndex));
+
+    } catch (IOException ex) {
+      exception = ex;
+    } finally {
+      omMetadataManager.getLock().releaseVolumeLock(volume);
+    }
+
+    // Performing audit logging outside of the lock.
+    auditLog(auditLogger, buildAuditMessage(OMAction.SET_QUOTA, auditMap,
+        exception, userInfo));
+
+    // return response after releasing lock.
+    if (exception == null) {
+      omResponse.setSetVolumePropertyResponse(
+          SetVolumePropertyResponse.newBuilder().build());
+      return new OMVolumeSetQuotaResponse(omVolumeArgs, omResponse.build());
+    } else {
+      omMetrics.incNumVolumeUpdateFails();
+      LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
+          setVolumePropertyRequest.getQuotaInBytes(), exception);
+      return new OMVolumeSetQuotaResponse(null,
+          createErrorOMResponse(omResponse, exception));
+    }
+  }
+
+
+}

+ 22 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package contains classes related to volume requests.
+ */
+package org.apache.hadoop.ozone.om.request.volume;

+ 2 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketCreateResponse.java

@@ -45,6 +45,8 @@ public final class OMBucketCreateResponse extends OMClientResponse {
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
     if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
       String dbBucketKey =
           omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),

+ 3 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketDeleteResponse.java

@@ -44,6 +44,9 @@ public final class OMBucketDeleteResponse extends OMClientResponse {
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
+
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
     if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
       String dbBucketKey =
           omMetadataManager.getBucketKey(volumeName, bucketName);

+ 3 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/bucket/OMBucketSetPropertyResponse.java

@@ -42,6 +42,9 @@ public class OMBucketSetPropertyResponse extends OMClientResponse {
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
+
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
     if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
       String dbBucketKey =
           omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),

+ 17 - 13
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java → hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeCreateResponse.java

@@ -16,12 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.response;
+package org.apache.hadoop.ozone.om.response.volume;
 
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -47,21 +50,22 @@ public class OMVolumeCreateResponse extends OMClientResponse {
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
-    String dbVolumeKey =
-        omMetadataManager.getVolumeKey(omVolumeArgs.getVolume());
-    String dbUserKey =
-        omMetadataManager.getUserKey(omVolumeArgs.getOwnerName());
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
+    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+      String dbVolumeKey =
+          omMetadataManager.getVolumeKey(omVolumeArgs.getVolume());
+      String dbUserKey =
+          omMetadataManager.getUserKey(omVolumeArgs.getOwnerName());
 
-    omMetadataManager.getVolumeTable().putWithBatch(batchOperation, dbVolumeKey,
-        omVolumeArgs);
-    omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
-        volumeList);
-  }
-
-  public VolumeList getVolumeList() {
-    return volumeList;
+      omMetadataManager.getVolumeTable().putWithBatch(batchOperation,
+          dbVolumeKey, omVolumeArgs);
+      omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
+          volumeList);
+    }
   }
 
+  @VisibleForTesting
   public OmVolumeArgs getOmVolumeArgs() {
     return omVolumeArgs;
   }

+ 18 - 11
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java → hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeDeleteResponse.java

@@ -16,11 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.response;
+package org.apache.hadoop.ozone.om.response.volume;
 
 import java.io.IOException;
 
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -46,17 +48,22 @@ public class OMVolumeDeleteResponse extends OMClientResponse {
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
-    String dbUserKey = omMetadataManager.getUserKey(owner);
-    VolumeList volumeList = updatedVolumeList;
-    if (updatedVolumeList.getVolumeNamesList().size() == 0) {
-      omMetadataManager.getUserTable().deleteWithBatch(batchOperation,
-          dbUserKey);
-    } else {
-      omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
-          volumeList);
+
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
+    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+      String dbUserKey = omMetadataManager.getUserKey(owner);
+      VolumeList volumeList = updatedVolumeList;
+      if (updatedVolumeList.getVolumeNamesList().size() == 0) {
+        omMetadataManager.getUserTable().deleteWithBatch(batchOperation,
+            dbUserKey);
+      } else {
+        omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
+            volumeList);
+      }
+      omMetadataManager.getVolumeTable().deleteWithBatch(batchOperation,
+          omMetadataManager.getVolumeKey(volume));
     }
-    omMetadataManager.getVolumeTable().deleteWithBatch(batchOperation,
-        omMetadataManager.getVolumeKey(volume));
   }
 
 }

+ 79 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetOwnerResponse.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Response for set owner request.
+ */
+public class OMVolumeSetOwnerResponse extends OMClientResponse {
+
+  private String oldOwner;
+  private VolumeList oldOwnerVolumeList;
+  private VolumeList newOwnerVolumeList;
+  private OmVolumeArgs newOwnerVolumeArgs;
+
+  public OMVolumeSetOwnerResponse(String oldOwner,
+      VolumeList oldOwnerVolumeList, VolumeList newOwnerVolumeList,
+      OmVolumeArgs newOwnerVolumeArgs, OMResponse omResponse) {
+    super(omResponse);
+    this.oldOwner = oldOwner;
+    this.oldOwnerVolumeList = oldOwnerVolumeList;
+    this.newOwnerVolumeList = newOwnerVolumeList;
+    this.newOwnerVolumeArgs = newOwnerVolumeArgs;
+  }
+
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
+    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+      String oldOwnerKey = omMetadataManager.getUserKey(oldOwner);
+      String newOwnerKey =
+          omMetadataManager.getUserKey(newOwnerVolumeArgs.getOwnerName());
+      if (oldOwnerVolumeList.getVolumeNamesList().size() == 0) {
+        omMetadataManager.getUserTable().deleteWithBatch(batchOperation,
+            oldOwnerKey);
+      } else {
+        omMetadataManager.getUserTable().putWithBatch(batchOperation,
+            oldOwnerKey, oldOwnerVolumeList);
+      }
+      omMetadataManager.getUserTable().putWithBatch(batchOperation, newOwnerKey,
+          newOwnerVolumeList);
+
+      String dbVolumeKey =
+          omMetadataManager.getVolumeKey(newOwnerVolumeArgs.getVolume());
+      omMetadataManager.getVolumeTable().putWithBatch(batchOperation,
+          dbVolumeKey, newOwnerVolumeArgs);
+    }
+  }
+}

+ 55 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/OMVolumeSetQuotaResponse.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import java.io.IOException;
+
+/**
+ * Response for set quota request.
+ */
+public class OMVolumeSetQuotaResponse extends OMClientResponse {
+  private OmVolumeArgs omVolumeArgs;
+
+  public OMVolumeSetQuotaResponse(OmVolumeArgs omVolumeArgs,
+      OMResponse omResponse) {
+    super(omResponse);
+    this.omVolumeArgs = omVolumeArgs;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    // For OmResponse with failure, this should do nothing. This method is
+    // not called in failure scenario in OM code.
+    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+      omMetadataManager.getVolumeTable().putWithBatch(batchOperation,
+          omMetadataManager.getVolumeKey(omVolumeArgs.getVolume()),
+          omVolumeArgs);
+    }
+  }
+}

+ 22 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/volume/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package contains classes related to volume requests.
+ */
+package org.apache.hadoop.ozone.om.response.volume;

+ 28 - 50
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java

@@ -96,58 +96,36 @@ public class OzoneManagerHARequestHandlerImpl
       long transactionLogIndex) {
     LOG.debug("Received OMRequest: {}, ", omRequest);
     Type cmdType = omRequest.getCmdType();
-    OMResponse.Builder responseBuilder =
-        OMResponse.newBuilder().setCmdType(cmdType)
-            .setStatus(Status.OK);
-    try {
-      switch (cmdType) {
-      case CreateVolume:
-        responseBuilder.setCreateVolumeResponse(
-            handleCreateVolumeApply(omRequest));
-        break;
-      case SetVolumeProperty:
-        responseBuilder.setSetVolumePropertyResponse(
-            handleSetVolumePropertyApply(omRequest));
-        break;
-      case DeleteVolume:
-        responseBuilder.setDeleteVolumeResponse(
-            handleDeleteVolumeApply(omRequest));
-        break;
-      case CreateBucket:
-      case DeleteBucket:
-      case SetBucketProperty:
-        //TODO: We don't need to pass transactionID, this will be removed when
-        // complete write requests is changed to new model. And also we can
-        // return OMClientResponse, then adding to doubleBuffer can be taken
-        // care by stateMachine. And also integrate both HA and NON HA code
-        // paths.
-        OMClientRequest omClientRequest =
-            OzoneManagerRatisUtils.createClientRequest(omRequest);
-        OMClientResponse omClientResponse =
-            omClientRequest.validateAndUpdateCache(getOzoneManager(),
-                transactionLogIndex);
-
-        // If any error we have got when validateAndUpdateCache, OMResponse
-        // Status is set with Error Code other than OK, in that case don't
-        // add this to double buffer.
-        if (omClientResponse.getOMResponse().getStatus() == Status.OK) {
-          ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex);
-        }
-        return omClientResponse.getOMResponse();
-      default:
-        // As all request types are not changed so we need to call handle
-        // here.
-        return handle(omRequest);
-      }
-      responseBuilder.setSuccess(true);
-    } catch (IOException ex) {
-      responseBuilder.setSuccess(false);
-      responseBuilder.setStatus(exceptionToResponseStatus(ex));
-      if (ex.getMessage() != null) {
-        responseBuilder.setMessage(ex.getMessage());
+    switch (cmdType) {
+    case CreateVolume:
+    case SetVolumeProperty:
+    case DeleteVolume:
+    case CreateBucket:
+    case DeleteBucket:
+    case SetBucketProperty:
+      //TODO: We don't need to pass transactionID, this will be removed when
+      // complete write requests is changed to new model. And also we can
+      // return OMClientResponse, then adding to doubleBuffer can be taken
+      // care by stateMachine. And also integrate both HA and NON HA code
+      // paths.
+      OMClientRequest omClientRequest =
+          OzoneManagerRatisUtils.createClientRequest(omRequest);
+      OMClientResponse omClientResponse =
+          omClientRequest.validateAndUpdateCache(getOzoneManager(),
+              transactionLogIndex);
+
+      // If any error we have got when validateAndUpdateCache, OMResponse
+      // Status is set with Error Code other than OK, in that case don't
+      // add this to double buffer.
+      if (omClientResponse.getOMResponse().getStatus() == Status.OK) {
+        ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex);
       }
+      return omClientResponse.getOMResponse();
+    default:
+      // As all request types are not changed so we need to call handle
+      // here.
+      return handle(omRequest);
     }
-    return responseBuilder.build();
   }
 
 

+ 1 - 1
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.response.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .CreateBucketResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos

+ 73 - 3
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java

@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.ozone.om.request;
 
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -28,6 +29,10 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .SetVolumePropertyRequest;
 import org.apache.hadoop.util.Time;
 
 /**
@@ -60,21 +65,34 @@ public final class TestOMRequestUtils {
   }
 
   /**
-   * Add's volume creation entry to OM DB.
+   * Add volume creation entry to OM DB.
    * @param volumeName
    * @param omMetadataManager
    * @throws Exception
    */
   public static void addVolumeToDB(String volumeName,
       OMMetadataManager omMetadataManager) throws Exception {
+    addVolumeToDB(volumeName, UUID.randomUUID().toString(), omMetadataManager);
+  }
+
+  /**
+   * Add volume creation entry to OM DB.
+   * @param volumeName
+   * @param ownerName
+   * @param omMetadataManager
+   * @throws Exception
+   */
+  public static void addVolumeToDB(String volumeName, String ownerName,
+      OMMetadataManager omMetadataManager) throws Exception {
     OmVolumeArgs omVolumeArgs =
         OmVolumeArgs.newBuilder().setCreationTime(Time.now())
-            .setVolume(volumeName).setAdminName(UUID.randomUUID().toString())
-            .setOwnerName(UUID.randomUUID().toString()).build();
+            .setVolume(volumeName).setAdminName(ownerName)
+            .setOwnerName(ownerName).build();
     omMetadataManager.getVolumeTable().put(
         omMetadataManager.getVolumeKey(volumeName), omVolumeArgs);
   }
 
+
   public static OzoneManagerProtocolProtos.OMRequest createBucketRequest(
       String bucketName, String volumeName, boolean isVersionEnabled,
       OzoneManagerProtocolProtos.StorageTypeProto storageTypeProto) {
@@ -103,4 +121,56 @@ public final class TestOMRequestUtils {
     return metadataList;
   }
 
+
+  /**
+   * Add user to user table.
+   * @param volumeName
+   * @param ownerName
+   * @param omMetadataManager
+   * @throws Exception
+   */
+  public static void addUserToDB(String volumeName, String ownerName,
+      OMMetadataManager omMetadataManager) throws Exception {
+    OzoneManagerProtocolProtos.VolumeList volumeList =
+        OzoneManagerProtocolProtos.VolumeList.newBuilder()
+            .addVolumeNames(volumeName).build();
+    omMetadataManager.getUserTable().put(
+        omMetadataManager.getUserKey(ownerName), volumeList);
+  }
+
+  /**
+   * Create OMRequest for set volume property request with owner set.
+   * @param volumeName
+   * @param newOwner
+   * @return OMRequest
+   */
+  public static OMRequest createSetVolumePropertyRequest(String volumeName,
+      String newOwner) {
+    SetVolumePropertyRequest setVolumePropertyRequest =
+        SetVolumePropertyRequest.newBuilder().setVolumeName(volumeName)
+            .setOwnerName(newOwner).build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetVolumeProperty)
+        .setSetVolumePropertyRequest(setVolumePropertyRequest).build();
+  }
+
+
+  /**
+   * Create OMRequest for set volume property request with quota set.
+   * @param volumeName
+   * @param quota
+   * @return OMRequest
+   */
+  public static OMRequest createSetVolumePropertyRequest(String volumeName,
+      long quota) {
+    SetVolumePropertyRequest setVolumePropertyRequest =
+        SetVolumePropertyRequest.newBuilder().setVolumeName(volumeName)
+            .setQuotaInBytes(quota).build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetVolumeProperty)
+        .setSetVolumePropertyRequest(setVolumePropertyRequest).build();
+  }
+
 }

+ 265 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeCreateRequest.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.util.UUID;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeInfo;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests create volume request.
+ */
+public class TestOMVolumeCreateRequest {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+  private AuditLogger auditLogger;
+
+
+  @Before
+  public void setup() throws Exception {
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.getMaxUserVolumeCount()).thenReturn(10L);
+    auditLogger = Mockito.mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+  }
+
+  @After
+  public void stop() {
+    omMetrics.unRegister();
+    Mockito.framework().clearInlineMocks();
+  }
+
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String adminName = UUID.randomUUID().toString();
+    String ownerName = UUID.randomUUID().toString();
+    doPreExecute(volumeName, adminName, ownerName);
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithZeroMaxUserVolumeCount()
+      throws Exception {
+    when(ozoneManager.getMaxUserVolumeCount()).thenReturn(0L);
+    String volumeName = UUID.randomUUID().toString();
+    String adminName = "user1";
+    String ownerName = "user1";
+
+    OMRequest originalRequest = createVolumeRequest(volumeName, adminName,
+        ownerName);
+
+    OMVolumeCreateRequest omVolumeCreateRequest =
+        new OMVolumeCreateRequest(originalRequest);
+
+    omVolumeCreateRequest.preExecute(ozoneManager);
+
+    try {
+      OMClientResponse omClientResponse =
+          omVolumeCreateRequest.validateAndUpdateCache(ozoneManager, 1);
+    } catch (IllegalArgumentException ex){
+      GenericTestUtils.assertExceptionContains("should be greater than zero",
+          ex);
+    }
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String adminName = "user1";
+    String ownerName = "user1";
+
+    OMRequest originalRequest = createVolumeRequest(volumeName, adminName,
+        ownerName);
+
+    OMVolumeCreateRequest omVolumeCreateRequest =
+        new OMVolumeCreateRequest(originalRequest);
+
+    omVolumeCreateRequest.preExecute(ozoneManager);
+
+    String volumeKey = omMetadataManager.getVolumeKey(volumeName);
+    String ownerKey = omMetadataManager.getUserKey(ownerName);
+
+    // As we have not still called validateAndUpdateCache, get() should
+    // return null.
+
+    Assert.assertNull(omMetadataManager.getVolumeTable().get(volumeKey));
+    Assert.assertNull(omMetadataManager.getUserTable().get(ownerKey));
+
+    OMClientResponse omClientResponse =
+        omVolumeCreateRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+
+
+    // Get volumeInfo from request.
+    VolumeInfo volumeInfo = omVolumeCreateRequest.getOmRequest()
+        .getCreateVolumeRequest().getVolumeInfo();
+
+    OmVolumeArgs omVolumeArgs =
+        omMetadataManager.getVolumeTable().get(volumeKey);
+    // As request is valid volume table should not have entry.
+    Assert.assertNotNull(omVolumeArgs);
+
+    // Check data from table and request.
+    Assert.assertEquals(volumeInfo.getVolume(), omVolumeArgs.getVolume());
+    Assert.assertEquals(volumeInfo.getOwnerName(), omVolumeArgs.getOwnerName());
+    Assert.assertEquals(volumeInfo.getAdminName(), omVolumeArgs.getAdminName());
+    Assert.assertEquals(volumeInfo.getCreationTime(),
+        omVolumeArgs.getCreationTime());
+
+    OzoneManagerProtocolProtos.VolumeList volumeList = omMetadataManager
+        .getUserTable().get(ownerKey);
+    Assert.assertNotNull(volumeList);
+    Assert.assertEquals(volumeName, volumeList.getVolumeNames(0));
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithVolumeAlreadyExists()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String adminName = "user1";
+    String ownerName = "user1";
+
+    TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
+
+    OMRequest originalRequest = createVolumeRequest(volumeName, adminName,
+        ownerName);
+
+    OMVolumeCreateRequest omVolumeCreateRequest =
+        new OMVolumeCreateRequest(originalRequest);
+
+    omVolumeCreateRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeCreateRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_ALREADY_EXISTS,
+        omResponse.getStatus());
+
+    // Check really if we have a volume with the specified volume name.
+    Assert.assertNotNull(omMetadataManager.getVolumeTable().get(
+        omMetadataManager.getVolumeKey(volumeName)));
+
+  }
+
+
+  private void doPreExecute(String volumeName,
+      String adminName, String ownerName) throws Exception {
+
+    OMRequest originalRequest = createVolumeRequest(volumeName, adminName,
+        ownerName);
+
+    OMVolumeCreateRequest omVolumeCreateRequest =
+        new OMVolumeCreateRequest(originalRequest);
+
+    OMRequest modifiedRequest = omVolumeCreateRequest.preExecute(ozoneManager);
+    verifyRequest(modifiedRequest, originalRequest);
+  }
+
+  /**
+   * Verify modifiedOmRequest and originalRequest.
+   * @param modifiedRequest
+   * @param originalRequest
+   */
+  private void verifyRequest(OMRequest modifiedRequest,
+      OMRequest originalRequest) {
+    VolumeInfo original = originalRequest.getCreateVolumeRequest()
+        .getVolumeInfo();
+    VolumeInfo updated = modifiedRequest.getCreateVolumeRequest()
+        .getVolumeInfo();
+
+    Assert.assertEquals(original.getAdminName(), updated.getAdminName());
+    Assert.assertEquals(original.getVolume(), updated.getVolume());
+    Assert.assertEquals(original.getOwnerName(),
+        updated.getOwnerName());
+    Assert.assertNotEquals(original.getCreationTime(),
+        updated.getCreationTime());
+  }
+
+  /**
+   * Create OMRequest for create volume.
+   * @param volumeName
+   * @param adminName
+   * @param ownerName
+   * @return OMRequest
+   */
+  private OMRequest createVolumeRequest(String volumeName, String adminName,
+      String ownerName) {
+    VolumeInfo volumeInfo = VolumeInfo.newBuilder().setVolume(volumeName)
+        .setAdminName(adminName).setOwnerName(ownerName).build();
+    CreateVolumeRequest createVolumeRequest =
+        CreateVolumeRequest.newBuilder().setVolumeInfo(volumeInfo).build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+        .setCreateVolumeRequest(createVolumeRequest).build();
+  }
+}

+ 222 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeDeleteRequest.java

@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.util.UUID;
+
+import com.google.common.base.Optional;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests delete volume request.
+ */
+public class TestOMVolumeDeleteRequest {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+  private AuditLogger auditLogger;
+
+
+  @Before
+  public void setup() throws Exception {
+    ozoneManager = mock(OzoneManager.class);
+    omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    auditLogger = Mockito.mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+
+  }
+
+  @After
+  public void stop() {
+    omMetrics.unRegister();
+    Mockito.framework().clearInlineMocks();
+  }
+
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = UUID.randomUUID().toString();
+    OMRequest originalRequest = deleteVolumeRequest(volumeName, ownerName);
+
+    OMVolumeDeleteRequest omVolumeDeleteRequest =
+        new OMVolumeDeleteRequest(originalRequest);
+
+    OMRequest modifiedRequest = omVolumeDeleteRequest.preExecute(ozoneManager);
+    Assert.assertNotEquals(originalRequest, modifiedRequest);
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+
+    OMRequest originalRequest = deleteVolumeRequest(volumeName, ownerName);
+
+    OMVolumeDeleteRequest omVolumeDeleteRequest =
+        new OMVolumeDeleteRequest(originalRequest);
+
+    omVolumeDeleteRequest.preExecute(ozoneManager);
+
+    // Add volume and user to DB
+    TestOMRequestUtils.addVolumeToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+
+    String volumeKey = omMetadataManager.getVolumeKey(volumeName);
+    String ownerKey = omMetadataManager.getUserKey(ownerName);
+
+
+    Assert.assertNotNull(omMetadataManager.getVolumeTable().get(volumeKey));
+    Assert.assertNotNull(omMetadataManager.getUserTable().get(ownerKey));
+
+    OMClientResponse omClientResponse =
+        omVolumeDeleteRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+
+
+
+    Assert.assertTrue(omMetadataManager.getUserTable().get(ownerKey)
+        .getVolumeNamesList().size() == 0);
+    // As now volume is deleted, table should not have those entries.
+    Assert.assertNull(omMetadataManager.getVolumeTable().get(volumeKey));
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithVolumeNotFound()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+
+    OMRequest originalRequest = deleteVolumeRequest(volumeName, ownerName);
+
+    OMVolumeDeleteRequest omVolumeDeleteRequest =
+        new OMVolumeDeleteRequest(originalRequest);
+
+    omVolumeDeleteRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeDeleteRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND,
+        omResponse.getStatus());
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithVolumeNotEmpty() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+
+    OMRequest originalRequest = deleteVolumeRequest(volumeName, ownerName);
+
+    OMVolumeDeleteRequest omVolumeDeleteRequest =
+        new OMVolumeDeleteRequest(originalRequest);
+
+    omVolumeDeleteRequest.preExecute(ozoneManager);
+
+    // Add some bucket to bucket table cache.
+    String bucketName = UUID.randomUUID().toString();
+    String bucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
+
+    OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName(volumeName).setBucketName(bucketName).build();
+    omMetadataManager.getBucketTable().addCacheEntry(new CacheKey<>(bucketKey),
+        new CacheValue<>(Optional.of(omBucketInfo), 1L));
+
+    // Add user and volume to DB.
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addVolumeToDB(volumeName, ownerName, omMetadataManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeDeleteRequest.validateAndUpdateCache(ozoneManager, 1L);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_EMPTY,
+        omResponse.getStatus());
+  }
+
+  /**
+   * Create OMRequest for delete volume.
+   * @param volumeName
+   * @param ownerName
+   * @return OMRequest
+   */
+  private OMRequest deleteVolumeRequest(String volumeName,
+      String ownerName) {
+    DeleteVolumeRequest deleteVolumeRequest =
+        DeleteVolumeRequest.newBuilder().setVolumeName(volumeName)
+            .setOwner(ownerName).build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.DeleteVolume)
+        .setDeleteVolumeRequest(deleteVolumeRequest).build();
+  }
+}

+ 204 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetOwnerRequest.java

@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests set volume property request.
+ */
+public class TestOMVolumeSetOwnerRequest {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+  private AuditLogger auditLogger;
+
+
+  @Before
+  public void setup() throws Exception {
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.getMaxUserVolumeCount()).thenReturn(10L);
+    auditLogger = Mockito.mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+  }
+
+  @After
+  public void stop() {
+    omMetrics.unRegister();
+    Mockito.framework().clearInlineMocks();
+  }
+
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String newOwner = "user1";
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName, newOwner);
+
+    OMVolumeSetQuotaRequest omVolumeSetQuotaRequest =
+        new OMVolumeSetQuotaRequest(originalRequest);
+
+    OMRequest modifiedRequest = omVolumeSetQuotaRequest.preExecute(
+        ozoneManager);
+    Assert.assertNotEquals(modifiedRequest, originalRequest);
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addVolumeToDB(volumeName, ownerName, omMetadataManager);
+
+    String newOwner = "user2";
+
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName, newOwner);
+
+    OMVolumeSetOwnerRequest omVolumeSetOwnerRequest =
+        new OMVolumeSetOwnerRequest(originalRequest);
+
+    omVolumeSetOwnerRequest.preExecute(ozoneManager);
+
+    String volumeKey = omMetadataManager.getVolumeKey(volumeName);
+    String ownerKey = omMetadataManager.getUserKey(ownerName);
+    String newOwnerKey = omMetadataManager.getUserKey(newOwner);
+
+
+
+    OMClientResponse omClientResponse =
+        omVolumeSetOwnerRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getSetVolumePropertyResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+
+
+    String fromDBOwner = omMetadataManager
+        .getVolumeTable().get(volumeKey).getOwnerName();
+    Assert.assertEquals(newOwner, fromDBOwner);
+
+
+    OzoneManagerProtocolProtos.VolumeList newOwnerVolumeList =
+        omMetadataManager.getUserTable().get(newOwnerKey);
+
+    Assert.assertNotNull(newOwnerVolumeList);
+    Assert.assertEquals(volumeName,
+        newOwnerVolumeList.getVolumeNamesList().get(0));
+
+    OzoneManagerProtocolProtos.VolumeList oldOwnerVolumeList =
+        omMetadataManager.getUserTable().get(
+            omMetadataManager.getUserKey(ownerKey));
+
+    Assert.assertNotNull(oldOwnerVolumeList);
+    Assert.assertTrue(oldOwnerVolumeList.getVolumeNamesList().size() == 0);
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithVolumeNotFound()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName,
+            ownerName);
+
+    OMVolumeSetOwnerRequest omVolumeSetOwnerRequest =
+        new OMVolumeSetOwnerRequest(originalRequest);
+
+    omVolumeSetOwnerRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeSetOwnerRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND,
+        omResponse.getStatus());
+
+  }
+
+  @Test
+  public void testInvalidRequest() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+
+    // create request with quota set.
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName,
+            100L);
+
+    OMVolumeSetOwnerRequest omVolumeSetOwnerRequest =
+        new OMVolumeSetOwnerRequest(originalRequest);
+
+    omVolumeSetOwnerRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeSetOwnerRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.INVALID_REQUEST,
+        omResponse.getStatus());
+  }
+}

+ 195 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeSetQuotaRequest.java

@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.volume;
+
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests set volume property request.
+ */
+public class TestOMVolumeSetQuotaRequest {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+  private AuditLogger auditLogger;
+
+  @Before
+  public void setup() throws Exception {
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    auditLogger = Mockito.mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+  }
+
+  @After
+  public void stop() {
+    omMetrics.unRegister();
+    Mockito.framework().clearInlineMocks();
+  }
+
+  @Test
+  public void testPreExecute() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    long quota = 100L;
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName, quota);
+
+    OMVolumeSetQuotaRequest omVolumeSetQuotaRequest =
+        new OMVolumeSetQuotaRequest(originalRequest);
+
+    OMRequest modifiedRequest = omVolumeSetQuotaRequest.preExecute(
+        ozoneManager);
+    Assert.assertNotEquals(modifiedRequest, originalRequest);
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+    long quotaSet = 100L;
+
+    TestOMRequestUtils.addUserToDB(volumeName, ownerName, omMetadataManager);
+    TestOMRequestUtils.addVolumeToDB(volumeName, ownerName, omMetadataManager);
+
+
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName, quotaSet);
+
+    OMVolumeSetQuotaRequest omVolumeSetQuotaRequest =
+        new OMVolumeSetQuotaRequest(originalRequest);
+
+    omVolumeSetQuotaRequest.preExecute(ozoneManager);
+
+    String volumeKey = omMetadataManager.getVolumeKey(volumeName);
+
+
+    // Get Quota before validateAndUpdateCache.
+    OmVolumeArgs omVolumeArgs =
+        omMetadataManager.getVolumeTable().get(volumeKey);
+    // As request is valid volume table should not have entry.
+    Assert.assertNotNull(omVolumeArgs);
+    long quotaBeforeSet = omVolumeArgs.getQuotaInBytes();
+
+
+    OMClientResponse omClientResponse =
+        omVolumeSetQuotaRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getSetVolumePropertyResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+
+
+    long quotaAfterSet = omMetadataManager
+        .getVolumeTable().get(volumeKey).getQuotaInBytes();
+    Assert.assertEquals(quotaSet, quotaAfterSet);
+    Assert.assertNotEquals(quotaBeforeSet, quotaAfterSet);
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithVolumeNotFound()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String ownerName = "user1";
+    long quota = 100L;
+
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName, quota);
+
+    OMVolumeSetQuotaRequest omVolumeSetQuotaRequest =
+        new OMVolumeSetQuotaRequest(originalRequest);
+
+    omVolumeSetQuotaRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeSetQuotaRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND,
+        omResponse.getStatus());
+
+  }
+
+  @Test
+  public void testInvalidRequest() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+
+    // create request with owner set.
+    OMRequest originalRequest =
+        TestOMRequestUtils.createSetVolumePropertyRequest(volumeName,
+            "user1");
+
+    // Creating OMVolumeSetQuotaRequest with SetProperty request set with owner.
+    OMVolumeSetQuotaRequest omVolumeSetQuotaRequest =
+        new OMVolumeSetQuotaRequest(originalRequest);
+
+    omVolumeSetQuotaRequest.preExecute(ozoneManager);
+
+    OMClientResponse omClientResponse =
+        omVolumeSetQuotaRequest.validateAndUpdateCache(ozoneManager, 1);
+
+    OzoneManagerProtocolProtos.OMResponse omResponse =
+        omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateVolumeResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.INVALID_REQUEST,
+        omResponse.getStatus());
+  }
+}

+ 21 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package contains test classes for volume requests.
+ */
+package org.apache.hadoop.ozone.om.request.volume;

+ 125 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeCreateResponse.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests OMVolumeCreateResponse.
+ */
+public class TestOMVolumeCreateResponse {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OMMetadataManager omMetadataManager;
+  private BatchOperation batchOperation;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+  }
+
+  @Test
+  public void testAddToDBBatch() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String userName = "user1";
+    VolumeList volumeList = VolumeList.newBuilder()
+        .addVolumeNames(volumeName).build();
+
+    OMResponse omResponse = OMResponse.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+            .setStatus(OzoneManagerProtocolProtos.Status.OK)
+            .setSuccess(true)
+            .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
+        .setOwnerName(userName).setAdminName(userName)
+        .setVolume(volumeName).setCreationTime(Time.now()).build();
+    OMVolumeCreateResponse omVolumeCreateResponse =
+        new OMVolumeCreateResponse(omVolumeArgs, volumeList, omResponse);
+
+    omVolumeCreateResponse.addToDBBatch(omMetadataManager, batchOperation);
+
+    // Do manual commit and see whether addToBatch is successful or not.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    Assert.assertEquals(omVolumeArgs,
+        omMetadataManager.getVolumeTable().get(
+            omMetadataManager.getVolumeKey(volumeName)));
+
+    Assert.assertEquals(volumeList,
+        omMetadataManager.getUserTable().get(
+            omMetadataManager.getUserKey(userName)));
+  }
+
+  @Test
+  public void testAddToDBBatchNoOp() throws Exception {
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+        .setStatus(OzoneManagerProtocolProtos.Status.VOLUME_ALREADY_EXISTS)
+        .setSuccess(false)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OMVolumeCreateResponse omVolumeCreateResponse =
+        new OMVolumeCreateResponse(null, null, omResponse);
+
+    try {
+      omVolumeCreateResponse.addToDBBatch(omMetadataManager, batchOperation);
+      Assert.assertTrue(omMetadataManager.countRowsInTable(
+          omMetadataManager.getVolumeTable()) == 0);
+    } catch (IOException ex) {
+      fail("testAddToDBBatchFailure failed");
+    }
+
+  }
+
+
+}

+ 130 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeDeleteResponse.java

@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests OMVolumeCreateResponse.
+ */
+public class TestOMVolumeDeleteResponse {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OMMetadataManager omMetadataManager;
+  private BatchOperation batchOperation;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+  }
+
+  @Test
+  public void testAddToDBBatch() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String userName = "user1";
+    VolumeList volumeList = VolumeList.newBuilder()
+        .addVolumeNames(volumeName).build();
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.DeleteVolume)
+        .setStatus(OzoneManagerProtocolProtos.Status.OK)
+        .setSuccess(true)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
+        .setOwnerName(userName).setAdminName(userName)
+        .setVolume(volumeName).setCreationTime(Time.now()).build();
+    OMVolumeCreateResponse omVolumeCreateResponse =
+        new OMVolumeCreateResponse(omVolumeArgs, volumeList, omResponse);
+
+    // As we are deleting updated volume list should be empty.
+    VolumeList updatedVolumeList = VolumeList.newBuilder().build();
+    OMVolumeDeleteResponse omVolumeDeleteResponse =
+        new OMVolumeDeleteResponse(volumeName, userName, updatedVolumeList,
+            omResponse);
+
+    omVolumeCreateResponse.addToDBBatch(omMetadataManager, batchOperation);
+    omVolumeDeleteResponse.addToDBBatch(omMetadataManager, batchOperation);
+
+    // Do manual commit and see whether addToBatch is successful or not.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    Assert.assertNull(null,
+        omMetadataManager.getVolumeTable().get(
+            omMetadataManager.getVolumeKey(volumeName)));
+
+    Assert.assertEquals(null,
+        omMetadataManager.getUserTable().get(
+            omMetadataManager.getUserKey(userName)));
+  }
+
+  @Test
+  public void testAddToDBBatchNoOp() throws Exception {
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.DeleteVolume)
+        .setStatus(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND)
+        .setSuccess(false)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OMVolumeDeleteResponse omVolumeDeleteResponse =
+        new OMVolumeDeleteResponse(null, null, null, omResponse);
+
+    try {
+      omVolumeDeleteResponse.addToDBBatch(omMetadataManager, batchOperation);
+    } catch (IOException ex) {
+      fail("testAddToDBBatchFailure failed");
+    }
+
+  }
+
+
+}

+ 142 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetOwnerResponse.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .VolumeList;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests OMVolumeCreateResponse.
+ */
+public class TestOMVolumeSetOwnerResponse {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OMMetadataManager omMetadataManager;
+  private BatchOperation batchOperation;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+  }
+
+  @Test
+  public void testAddToDBBatch() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String oldOwner = "user1";
+    VolumeList volumeList = VolumeList.newBuilder()
+        .addVolumeNames(volumeName).build();
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetVolumeProperty)
+        .setStatus(OzoneManagerProtocolProtos.Status.OK)
+        .setSuccess(true)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
+        .setOwnerName(oldOwner).setAdminName(oldOwner)
+        .setVolume(volumeName).setCreationTime(Time.now()).build();
+    OMVolumeCreateResponse omVolumeCreateResponse =
+        new OMVolumeCreateResponse(omVolumeArgs, volumeList, omResponse);
+
+
+
+    String newOwner = "user2";
+    VolumeList newOwnerVolumeList = VolumeList.newBuilder()
+        .addVolumeNames(volumeName).build();
+    VolumeList oldOwnerVolumeList = VolumeList.newBuilder().build();
+    OmVolumeArgs newOwnerVolumeArgs = OmVolumeArgs.newBuilder()
+        .setOwnerName(newOwner).setAdminName(newOwner)
+        .setVolume(volumeName).setCreationTime(omVolumeArgs.getCreationTime())
+        .build();
+
+    OMVolumeSetOwnerResponse omVolumeSetOwnerResponse =
+        new OMVolumeSetOwnerResponse(oldOwner,  oldOwnerVolumeList,
+            newOwnerVolumeList, newOwnerVolumeArgs, omResponse);
+
+    omVolumeCreateResponse.addToDBBatch(omMetadataManager, batchOperation);
+    omVolumeSetOwnerResponse.addToDBBatch(omMetadataManager, batchOperation);
+
+    // Do manual commit and see whether addToBatch is successful or not.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+
+    Assert.assertEquals(newOwnerVolumeArgs,
+        omMetadataManager.getVolumeTable().get(
+            omMetadataManager.getVolumeKey(volumeName)));
+
+    Assert.assertEquals(volumeList,
+        omMetadataManager.getUserTable().get(
+            omMetadataManager.getUserKey(newOwner)));
+  }
+
+  @Test
+  public void testAddToDBBatchNoOp() throws Exception {
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetVolumeProperty)
+        .setStatus(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND)
+        .setSuccess(false)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OMVolumeSetOwnerResponse omVolumeSetOwnerResponse =
+        new OMVolumeSetOwnerResponse(null, null, null, null, omResponse);
+
+    try {
+      omVolumeSetOwnerResponse.addToDBBatch(omMetadataManager, batchOperation);
+      Assert.assertTrue(omMetadataManager.countRowsInTable(
+          omMetadataManager.getVolumeTable()) == 0);
+    } catch (IOException ex) {
+      fail("testAddToDBBatchFailure failed");
+    }
+
+  }
+
+
+}

+ 117 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeSetQuotaResponse.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.volume;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests OMVolumeCreateResponse.
+ */
+public class TestOMVolumeSetQuotaResponse {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OMMetadataManager omMetadataManager;
+  private BatchOperation batchOperation;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+  }
+
+  @Test
+  public void testAddToDBBatch() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String userName = "user1";
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.SetVolumeProperty)
+        .setStatus(OzoneManagerProtocolProtos.Status.OK)
+        .setSuccess(true)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OmVolumeArgs omVolumeArgs = OmVolumeArgs.newBuilder()
+        .setOwnerName(userName).setAdminName(userName)
+        .setVolume(volumeName).setCreationTime(Time.now()).build();
+    OMVolumeSetQuotaResponse omVolumeSetQuotaResponse =
+        new OMVolumeSetQuotaResponse(omVolumeArgs, omResponse);
+
+    omVolumeSetQuotaResponse.addToDBBatch(omMetadataManager, batchOperation);
+
+    // Do manual commit and see whether addToBatch is successful or not.
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    Assert.assertEquals(omVolumeArgs,
+        omMetadataManager.getVolumeTable().get(
+            omMetadataManager.getVolumeKey(volumeName)));
+
+  }
+
+  @Test
+  public void testAddToDBBatchNoOp() throws Exception {
+
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+        .setStatus(OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND)
+        .setSuccess(false)
+        .setCreateVolumeResponse(CreateVolumeResponse.getDefaultInstance())
+        .build();
+
+    OMVolumeSetQuotaResponse omVolumeSetQuotaResponse =
+        new OMVolumeSetQuotaResponse(null, omResponse);
+
+    try {
+      omVolumeSetQuotaResponse.addToDBBatch(omMetadataManager, batchOperation);
+      Assert.assertTrue(omMetadataManager.countRowsInTable(
+          omMetadataManager.getVolumeTable()) == 0);
+    } catch (IOException ex) {
+      fail("testAddToDBBatchFailure failed");
+    }
+  }
+
+
+}