Bläddra i källkod

HDDS-795. RocksDb specific classes leak from DBStore/Table interfaces. Contributed by Márton Elek.

Ajay Kumar 6 år sedan
förälder
incheckning
8d2789c5eb

+ 27 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/BatchOperation.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.utils.db;
+
+/**
+ * Class represents a batch operation, collects multiple db operation.
+ */
+public interface BatchOperation extends AutoCloseable {
+
+  void close();
+}

+ 17 - 6
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java

@@ -19,12 +19,11 @@
 
 package org.apache.hadoop.utils.db;
 
-import org.apache.hadoop.classification.InterfaceStability;
-import org.rocksdb.WriteBatch;
-
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * The DBStore interface provides the ability to create Tables, which store
  * a specific type of Key-Value pair. Some DB interfaces like LevelDB will not
@@ -107,9 +106,21 @@ public interface DBStore extends AutoCloseable {
   long getEstimatedKeyCount() throws IOException;
 
   /**
-   * Writes a transaction into the DB using the default write Options.
-   * @param batch - Batch to write.
+   * Initialize an atomic batch operation which can hold multiple PUT/DELETE
+   * operations and committed later in one step.
+   *
+   * @return BatchOperation holder which can be used to add or commit batch
+   * operations.
+   */
+  BatchOperation initBatchOperation();
+
+  /**
+   * Commit the batch operations.
+   *
+   * @param operation which contains all the required batch operation.
+   * @throws IOException on Failure.
    */
-  void write(WriteBatch batch) throws IOException;
+  void commitBatchOperation(BatchOperation operation) throws IOException;
+
 
 }

+ 69 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java

@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.utils.db;
+
+import java.io.IOException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Batch operation implementation for rocks db.
+ */
+public class RDBBatchOperation implements BatchOperation {
+
+  private final WriteBatch writeBatch;
+
+  public RDBBatchOperation() {
+    writeBatch = new WriteBatch();
+  }
+
+  public void commit(RocksDB db, WriteOptions writeOptions) throws IOException {
+    try {
+      db.write(writeOptions, writeBatch);
+    } catch (RocksDBException e) {
+      throw new IOException("Unable to write the batch.", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    writeBatch.close();
+  }
+
+  public void delete(ColumnFamilyHandle handle, byte[] key) throws IOException {
+    try {
+      writeBatch.delete(handle, key);
+    } catch (RocksDBException e) {
+      throw new IOException("Can't record batch delete operation.", e);
+    }
+  }
+
+  public void put(ColumnFamilyHandle handle, byte[] key, byte[] value)
+      throws IOException {
+    try {
+      writeBatch.put(handle, key, value);
+    } catch (RocksDBException e) {
+      throw new IOException("Can't record batch put operation.", e);
+    }
+  }
+}

+ 9 - 6
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java

@@ -237,14 +237,17 @@ public class RDBStore implements DBStore {
   }
 
   @Override
-  public void write(WriteBatch batch) throws IOException {
-    try {
-      db.write(writeOptions, batch);
-    } catch (RocksDBException e) {
-      throw toIOException("Unable to write the batch.", e);
-    }
+  public BatchOperation initBatchOperation() {
+    return new RDBBatchOperation();
   }
 
+  @Override
+  public void commitBatchOperation(BatchOperation operation)
+      throws IOException {
+    ((RDBBatchOperation) operation).commit(db, writeOptions);
+  }
+
+
   @VisibleForTesting
   protected ObjectName getStatMBeanName() {
     return statMBeanName;

+ 22 - 29
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBTable.java

@@ -19,19 +19,19 @@
 
 package org.apache.hadoop.utils.db;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.hadoop.hdfs.DFSUtil;
+
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
 /**
  * RocksDB implementation of ozone metadata store.
  */
@@ -79,7 +79,6 @@ public class RDBTable implements Table {
    *
    * @return ColumnFamilyHandle.
    */
-  @Override
   public ColumnFamilyHandle getHandle() {
     return handle;
   }
@@ -96,6 +95,17 @@ public class RDBTable implements Table {
     }
   }
 
+  @Override
+  public void putWithBatch(BatchOperation batch, byte[] key, byte[] value)
+      throws IOException {
+    if (batch instanceof RDBBatchOperation) {
+      ((RDBBatchOperation) batch).put(getHandle(), key, value);
+    } else {
+      throw new IllegalArgumentException("batch should be RDBBatchOperation");
+    }
+  }
+
+
   @Override
   public boolean isEmpty() throws IOException {
     try (TableIterator<KeyValue> keyIter = iterator()) {
@@ -124,32 +134,15 @@ public class RDBTable implements Table {
   }
 
   @Override
-  public void writeBatch(WriteBatch operation) throws IOException {
-    try {
-      db.write(writeOptions, operation);
-    } catch (RocksDBException e) {
-      throw toIOException("Batch write operation failed", e);
+  public void deleteWithBatch(BatchOperation batch, byte[] key)
+      throws IOException {
+    if (batch instanceof RDBBatchOperation) {
+      ((RDBBatchOperation) batch).delete(getHandle(), key);
+    } else {
+      throw new IllegalArgumentException("batch should be RDBBatchOperation");
     }
-  }
 
-//  @Override
-//  public void iterate(byte[] from, EntryConsumer consumer)
-//      throws IOException {
-//
-//    try (RocksIterator it = db.newIterator(handle)) {
-//      if (from != null) {
-//        it.seek(from);
-//      } else {
-//        it.seekToFirst();
-//      }
-//      while (it.isValid()) {
-//        if (!consumer.consume(it.key(), it.value())) {
-//          break;
-//        }
-//        it.next();
-//      }
-//    }
-//  }
+  }
 
   @Override
   public TableIterator<KeyValue> iterator() {

+ 17 - 15
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/Table.java

@@ -19,12 +19,10 @@
 
 package org.apache.hadoop.utils.db;
 
-import org.apache.hadoop.classification.InterfaceStability;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.WriteBatch;
-
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * Interface for key-value store that stores ozone metadata. Ozone metadata is
  * stored as key value pairs, both key and value are arbitrary byte arrays. Each
@@ -42,6 +40,16 @@ public interface Table extends AutoCloseable {
    */
   void put(byte[] key, byte[] value) throws IOException;
 
+  /**
+   * Puts a key-value pair into the store as part of a bath operation.
+   *
+   * @param batch the batch operation
+   * @param key metadata key
+   * @param value metadata value
+   */
+  void putWithBatch(BatchOperation batch, byte[] key, byte[] value)
+      throws IOException;
+
   /**
    * @return true if the metadata store is empty.
    * @throws IOException on Failure
@@ -67,19 +75,13 @@ public interface Table extends AutoCloseable {
   void delete(byte[] key) throws IOException;
 
   /**
-   * Return the Column Family handle. TODO: This leaks an RockDB abstraction
-   * into Ozone code, cleanup later.
-   *
-   * @return ColumnFamilyHandle
-   */
-  ColumnFamilyHandle getHandle();
-
-  /**
-   * A batch of PUT, DELETE operations handled as a single atomic write.
+   * Deletes a key from the metadata store as part of a batch operation.
    *
-   * @throws IOException write fails
+   * @param batch the batch operation
+   * @param key metadata key
+   * @throws IOException on Failure
    */
-  void writeBatch(WriteBatch operation) throws IOException;
+  void deleteWithBatch(BatchOperation batch, byte[] key) throws IOException;
 
   /**
    * Returns the iterator for this metadata store.

+ 45 - 17
hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBTableStore.java

@@ -19,8 +19,16 @@
 
 package org.apache.hadoop.utils.db;
 
-import org.apache.commons.lang3.RandomStringUtils;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.hadoop.hdfs.DFSUtil;
+
+import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -32,14 +40,6 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.Statistics;
 import org.rocksdb.StatsLevel;
-import org.rocksdb.WriteBatch;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 
 /**
  * Tests for RocksDBTable Store.
@@ -89,7 +89,7 @@ public class TestRDBTableStore {
   public void getHandle() throws Exception {
     try (Table testTable = rdbStore.getTable("First")) {
       Assert.assertNotNull(testTable);
-      Assert.assertNotNull(testTable.getHandle());
+      Assert.assertNotNull(((RDBTable) testTable).getHandle());
     }
   }
 
@@ -149,18 +149,46 @@ public class TestRDBTableStore {
   }
 
   @Test
-  public void writeBatch() throws Exception {
-    WriteBatch batch = new WriteBatch();
-    try (Table testTable = rdbStore.getTable("Fifth")) {
+  public void batchPut() throws Exception {
+    try (Table testTable = rdbStore.getTable("Fifth");
+        BatchOperation batch = rdbStore.initBatchOperation()) {
+      //given
       byte[] key =
           RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
       byte[] value =
           RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
-      batch.put(testTable.getHandle(), key, value);
-      testTable.writeBatch(batch);
+      Assert.assertNull(testTable.get(key));
+
+      //when
+      testTable.putWithBatch(batch, key, value);
+      rdbStore.commitBatchOperation(batch);
+
+      //then
+      Assert.assertNotNull(testTable.get(key));
+    }
+  }
+
+  @Test
+  public void batchDelete() throws Exception {
+    try (Table testTable = rdbStore.getTable("Fifth");
+        BatchOperation batch = rdbStore.initBatchOperation()) {
+
+      //given
+      byte[] key =
+          RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+      byte[] value =
+          RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+      testTable.put(key, value);
       Assert.assertNotNull(testTable.get(key));
+
+
+      //when
+      testTable.deleteWithBatch(batch, key);
+      rdbStore.commitBatchOperation(batch);
+
+      //then
+      Assert.assertNull(testTable.get(key));
     }
-    batch.close();
   }
 
   private static boolean consume(Table.KeyValue keyValue) {
@@ -195,4 +223,4 @@ public class TestRDBTableStore {
       }
     }
   }
-}
+}

+ 17 - 13
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java

@@ -16,7 +16,11 @@
  */
 package org.apache.hadoop.ozone.om;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -28,19 +32,16 @@ import org.apache.hadoop.utils.BackgroundTask;
 import org.apache.hadoop.utils.BackgroundTaskQueue;
 import org.apache.hadoop.utils.BackgroundTaskResult;
 import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.apache.hadoop.utils.db.DBStore;
 import org.apache.hadoop.utils.db.Table;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteBatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is the background service to delete keys. Scan the metadata of om
@@ -151,20 +152,23 @@ public class KeyDeletingService extends BackgroundService {
     private int deleteAllKeys(List<DeleteBlockGroupResult> results)
         throws RocksDBException, IOException {
       Table deletedTable = manager.getMetadataManager().getDeletedTable();
+
+      DBStore store = manager.getMetadataManager().getStore();
+
       // Put all keys to delete in a single transaction and call for delete.
       int deletedCount = 0;
-      try (WriteBatch writeBatch = new WriteBatch()) {
+      try (BatchOperation writeBatch = store.initBatchOperation()) {
         for (DeleteBlockGroupResult result : results) {
           if (result.isSuccess()) {
             // Purge key from OM DB.
-            writeBatch.delete(deletedTable.getHandle(),
+            deletedTable.deleteWithBatch(writeBatch,
                 DFSUtil.string2Bytes(result.getObjectKey()));
             LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
             deletedCount++;
           }
         }
         // Write a single transaction for delete.
-        manager.getMetadataManager().getStore().write(writeBatch);
+        store.commitBatchOperation(writeBatch);
       }
       return deletedCount;
     }

+ 19 - 19
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -16,7 +16,12 @@
  */
 package org.apache.hadoop.ozone.om;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -34,22 +39,14 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.KeyLocationList;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.BackgroundService;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteBatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.utils.db.BatchOperation;
+import org.apache.hadoop.utils.db.DBStore;
 
+import com.google.common.base.Preconditions;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
@@ -58,8 +55,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of keyManager.
@@ -436,13 +435,14 @@ public class KeyManagerImpl implements KeyManager {
           OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue));
       newKeyInfo.setKeyName(toKeyName);
       newKeyInfo.updateModifcationTime();
-      try (WriteBatch batch = new WriteBatch()) {
-        batch.delete(metadataManager.getKeyTable().getHandle(), fromKey);
-        batch.put(metadataManager.getKeyTable().getHandle(), toKey,
+      DBStore store = metadataManager.getStore();
+      try (BatchOperation batch = store.initBatchOperation()) {
+        metadataManager.getKeyTable().deleteWithBatch(batch, fromKey);
+        metadataManager.getKeyTable().putWithBatch(batch, toKey,
             newKeyInfo.getProtobuf().toByteArray());
-        metadataManager.getStore().write(batch);
+        store.commitBatchOperation(batch);
       }
-    } catch (RocksDBException | IOException ex) {
+    } catch (IOException ex) {
       LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}",
           volumeName, bucketName, fromKeyName, toKeyName, ex);
       throw new OMException(ex.getMessage(),

+ 34 - 41
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManagerImpl.java

@@ -16,35 +16,30 @@
  */
 package org.apache.hadoop.ozone.om;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.RocksDBStore;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_USER_MAX_VOLUME_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_USER_MAX_VOLUME;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-
 /**
  * OM volume management code.
  */
@@ -69,7 +64,7 @@ public class VolumeManagerImpl implements VolumeManager {
 
   // Helpers to add and delete volume from user list
   private void addVolumeToOwnerList(String volume, String owner,
-      WriteBatch batchOperation) throws RocksDBException, IOException {
+      BatchOperation batchOperation) throws IOException {
     // Get the volume list
     byte[] dbUserKey = metadataManager.getUserKey(owner);
     byte[] volumeList  = metadataManager.getUserTable().get(dbUserKey);
@@ -89,12 +84,12 @@ public class VolumeManagerImpl implements VolumeManager {
     prevVolList.add(volume);
     VolumeList newVolList = VolumeList.newBuilder()
         .addAllVolumeNames(prevVolList).build();
-    batchOperation.put(metadataManager.getUserTable().getHandle(),
+    metadataManager.getUserTable().putWithBatch(batchOperation,
         dbUserKey, newVolList.toByteArray());
   }
 
   private void delVolumeFromOwnerList(String volume, String owner,
-      WriteBatch batch) throws RocksDBException, IOException {
+      BatchOperation batch) throws RocksDBException, IOException {
     // Get the volume list
     byte[] dbUserKey = metadataManager.getUserKey(owner);
     byte[] volumeList  = metadataManager.getUserTable().get(dbUserKey);
@@ -110,11 +105,11 @@ public class VolumeManagerImpl implements VolumeManager {
     // Remove the volume from the list
     prevVolList.remove(volume);
     if (prevVolList.size() == 0) {
-      batch.delete(metadataManager.getUserTable().getHandle(), dbUserKey);
+      metadataManager.getUserTable().deleteWithBatch(batch, dbUserKey);
     } else {
       VolumeList newVolList = VolumeList.newBuilder()
           .addAllVolumeNames(prevVolList).build();
-      batch.put(metadataManager.getUserTable().getHandle(),
+      metadataManager.getUserTable().putWithBatch(batch,
           dbUserKey, newVolList.toByteArray());
     }
   }
@@ -138,7 +133,8 @@ public class VolumeManagerImpl implements VolumeManager {
         throw new OMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
       }
 
-      try(WriteBatch batch = new WriteBatch()) {
+      try (BatchOperation batch = metadataManager.getStore()
+          .initBatchOperation()) {
         // Write the vol info
         List<HddsProtos.KeyValue> metadataList = new ArrayList<>();
         for (Map.Entry<String, String> entry :
@@ -157,23 +153,19 @@ public class VolumeManagerImpl implements VolumeManager {
             .addAllVolumeAcls(aclList)
             .setCreationTime(Time.now())
             .build();
-        batch.put(metadataManager.getVolumeTable().getHandle(),
+        metadataManager.getVolumeTable().putWithBatch(batch,
             dbVolumeKey, newVolumeInfo.toByteArray());
 
         // Add volume to user list
         addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
-        metadataManager.getStore().write(batch);
+        metadataManager.getStore().commitBatchOperation(batch);
       }
       LOG.debug("created volume:{} user:{}", args.getVolume(),
           args.getOwnerName());
-    } catch (RocksDBException | IOException ex) {
+    } catch (IOException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Volume creation failed for user:{} volume:{}",
             args.getOwnerName(), args.getVolume(), ex);
-      }
-      if(ex instanceof RocksDBException) {
-        throw RocksDBStore.toIOException("Volume creation failed.",
-            (RocksDBException) ex);
       } else {
         throw (IOException) ex;
       }
@@ -209,7 +201,8 @@ public class VolumeManagerImpl implements VolumeManager {
       OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
       Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
 
-      try(WriteBatch batch = new WriteBatch()) {
+      try (BatchOperation batch = metadataManager.getStore()
+          .initBatchOperation()) {
         delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
         addVolumeToOwnerList(volume, owner, batch);
 
@@ -222,9 +215,9 @@ public class VolumeManagerImpl implements VolumeManager {
                 .build();
 
         VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
-        batch.put(metadataManager.getVolumeTable().getHandle(),
+        metadataManager.getVolumeTable().putWithBatch(batch,
             dbVolumeKey, newVolumeInfo.toByteArray());
-        metadataManager.getStore().write(batch);
+        metadataManager.getStore().commitBatchOperation(batch);
       }
     } catch (RocksDBException | IOException ex) {
       if (!(ex instanceof OMException)) {
@@ -356,11 +349,11 @@ public class VolumeManagerImpl implements VolumeManager {
       Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
       // delete the volume from the owner list
       // as well as delete the volume entry
-      try(WriteBatch batch = new WriteBatch()) {
+      try (BatchOperation batch = metadataManager.getStore()
+          .initBatchOperation()) {
         delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
-        batch.delete(metadataManager.getVolumeTable().getHandle(),
-            dbVolumeKey);
-        metadataManager.getStore().write(batch);
+        metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey);
+        metadataManager.getStore().commitBatchOperation(batch);
       }
     } catch (RocksDBException| IOException ex) {
       if (!(ex instanceof OMException)) {