فهرست منبع

HDDS-1512. Implement DoubleBuffer in OzoneManager. (#810)

Bharat Viswanadham 6 سال پیش
والد
کامیت
6d0e79c121

+ 212 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java

@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements DoubleBuffer implementation of OMClientResponse's. In
+ * DoubleBuffer it has 2 buffers one is currentBuffer and other is
+ * readyBuffer. The current OM requests will be always added to currentBuffer.
+ * Flush thread will be running in background, it check's if currentBuffer has
+ * any entries, it swaps the buffer and creates a batch and commit to DB.
+ * Adding OM request to doubleBuffer and swap of buffer are synchronized
+ * methods.
+ *
+ */
+public class OzoneManagerDoubleBuffer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class.getName());
+
+  // Taken unbounded queue, if sync thread is taking too long time, we
+  // might end up taking huge memory to add entries to the buffer.
+  // TODO: We can avoid this using unbounded queue and use queue with
+  // capacity, if queue is full we can wait for sync to be completed to
+  // add entries. But in this also we might block rpc handlers, as we
+  // clear entries after sync. Or we can come up with a good approach to
+  // solve this.
+  private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
+  private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
+
+  private Daemon daemon;
+  private final OMMetadataManager omMetadataManager;
+  private final AtomicLong flushedTransactionCount = new AtomicLong(0);
+  private final AtomicLong flushIterations = new AtomicLong(0);
+  private volatile boolean isRunning;
+
+
+  public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager) {
+    this.currentBuffer = new ConcurrentLinkedQueue<>();
+    this.readyBuffer = new ConcurrentLinkedQueue<>();
+    this.omMetadataManager = omMetadataManager;
+
+    isRunning = true;
+    // Daemon thread which runs in back ground and flushes transactions to DB.
+    daemon = new Daemon(this::flushTransactions);
+    daemon.setName("OMDoubleBufferFlushThread");
+    daemon.start();
+
+  }
+
+  /**
+   * Runs in a background thread and batches the transaction in currentBuffer
+   * and commit to DB.
+   */
+  private void flushTransactions() {
+    while(isRunning) {
+      try {
+        if (canFlush()) {
+          setReadyBuffer();
+          final BatchOperation batchOperation = omMetadataManager.getStore()
+              .initBatchOperation();
+
+          readyBuffer.iterator().forEachRemaining((entry) -> {
+            try {
+              entry.getResponse().addToDBBatch(omMetadataManager,
+                  batchOperation);
+            } catch (IOException ex) {
+              // During Adding to RocksDB batch entry got an exception.
+              // We should terminate the OM.
+              terminate(ex);
+            }
+          });
+
+          omMetadataManager.getStore().commitBatchOperation(batchOperation);
+          int flushedTransactionsSize = readyBuffer.size();
+          flushedTransactionCount.addAndGet(flushedTransactionsSize);
+          flushIterations.incrementAndGet();
+
+          LOG.debug("Sync Iteration {} flushed transactions in this " +
+                  "iteration{}", flushIterations.get(),
+              flushedTransactionsSize);
+          readyBuffer.clear();
+          // TODO: update the last updated index in OzoneManagerStateMachine.
+        }
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        if (isRunning) {
+          final String message = "OMDoubleBuffer flush thread " +
+              Thread.currentThread().getName() + " encountered Interrupted " +
+              "exception while running";
+          ExitUtils.terminate(1, message, ex, LOG);
+        } else {
+          LOG.info("OMDoubleBuffer flush thread " +
+              Thread.currentThread().getName() + " is interrupted and will " +
+              "exit. {}", Thread.currentThread().getName());
+        }
+      } catch (IOException ex) {
+        terminate(ex);
+      } catch (Throwable t) {
+        final String s = "OMDoubleBuffer flush thread" +
+            Thread.currentThread().getName() + "encountered Throwable error";
+        ExitUtils.terminate(2, s, t, LOG);
+      }
+    }
+  }
+
+  /**
+   * Stop OM DoubleBuffer flush thread.
+   */
+  public synchronized void stop() {
+    if (isRunning) {
+      LOG.info("Stopping OMDoubleBuffer flush thread");
+      isRunning = false;
+      daemon.interrupt();
+    } else {
+      LOG.info("OMDoubleBuffer flush thread is not running.");
+    }
+
+  }
+
+  private void terminate(IOException ex) {
+    String message = "During flush to DB encountered error in " +
+        "OMDoubleBuffer flush thread " + Thread.currentThread().getName();
+    ExitUtils.terminate(1, message, ex, LOG);
+  }
+
+  /**
+   * Returns the flushed transaction count to OM DB.
+   * @return flushedTransactionCount
+   */
+  public long getFlushedTransactionCount() {
+    return flushedTransactionCount.get();
+  }
+
+  /**
+   * Returns total number of flush iterations run by sync thread.
+   * @return flushIterations
+   */
+  public long getFlushIterations() {
+    return flushIterations.get();
+  }
+
+  /**
+   * Add OmResponseBufferEntry to buffer.
+   * @param response
+   * @param transactionIndex
+   */
+  public synchronized void add(OMClientResponse response,
+      long transactionIndex) {
+    currentBuffer.add(new DoubleBufferEntry<>(transactionIndex, response));
+    notify();
+  }
+
+  /**
+   * Check can we flush transactions or not. This method wait's until
+   * currentBuffer size is greater than zero, once currentBuffer size is
+   * greater than zero it gets notify signal, and it returns true
+   * indicating that we are ready to flush.
+   *
+   * @return boolean
+   */
+  private synchronized boolean canFlush() throws InterruptedException {
+    // When transactions are added to buffer it notifies, then we check if
+    // currentBuffer size once and return from this method.
+    while (currentBuffer.size() == 0) {
+      wait(Long.MAX_VALUE);
+    }
+    return true;
+  }
+
+  /**
+   * Prepares the readyBuffer which is used by sync thread to flush
+   * transactions to OM DB. This method swaps the currentBuffer and readyBuffer.
+   */
+  private synchronized void setReadyBuffer() {
+    Queue<DoubleBufferEntry<OMClientResponse>> temp = currentBuffer;
+    currentBuffer = readyBuffer;
+    readyBuffer = temp;
+  }
+
+}
+

+ 44 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis.helpers;
+
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+
+/**
+ * Entry in OzoneManagerDouble Buffer.
+ * @param <Response>
+ */
+public class DoubleBufferEntry<Response extends OMClientResponse> {
+
+  private long trxLogIndex;
+  private Response response;
+
+  public DoubleBufferEntry(long trxLogIndex, Response response) {
+    this.trxLogIndex = trxLogIndex;
+    this.response = response;
+  }
+
+  public long getTrxLogIndex() {
+    return trxLogIndex;
+  }
+
+  public Response getResponse() {
+    return response;
+  }
+}

+ 20 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java

@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+/**
+ * package which contains helper classes for each OM request response.
+ */
+package org.apache.hadoop.ozone.om.ratis.helpers;

+ 52 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Response for CreateBucket request.
+ */
+public final class OMBucketCreateResponse implements OMClientResponse {
+
+  private final OmBucketInfo omBucketInfo;
+
+  public OMBucketCreateResponse(OmBucketInfo omBucketInfo) {
+    this.omBucketInfo = omBucketInfo;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    String dbBucketKey =
+        omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+            omBucketInfo.getBucketName());
+    omMetadataManager.getBucketTable().putWithBatch(batchOperation, dbBucketKey,
+        omBucketInfo);
+  }
+
+  public OmBucketInfo getOmBucketInfo() {
+    return omBucketInfo;
+  }
+}
+

+ 57 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Response for DeleteBucket request.
+ */
+public final class OMBucketDeleteResponse implements OMClientResponse {
+
+  private String volumeName;
+  private String bucketName;
+
+  public OMBucketDeleteResponse(
+      String volumeName, String bucketName) {
+    this.volumeName = volumeName;
+    this.bucketName = bucketName;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    String dbBucketKey =
+        omMetadataManager.getBucketKey(volumeName, bucketName);
+    omMetadataManager.getBucketTable().deleteWithBatch(batchOperation,
+        dbBucketKey);
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public String getBucketName() {
+    return bucketName;
+  }
+}
+

+ 45 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import java.io.IOException;
+
+/**
+ * Interface for OM Responses, each OM response should implement this interface.
+ */
+public interface OMClientResponse {
+
+  /**
+   * Implement logic to add the response to batch.
+   * @param omMetadataManager
+   * @param batchOperation
+   * @throws IOException
+   */
+  default void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    throw new NotImplementedException("Not implemented, Each OM Response " +
+        "should implement this method");
+  }
+
+}
+

+ 64 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Response for CreateBucket request.
+ */
+public class OMVolumeCreateResponse implements OMClientResponse {
+
+  private OzoneManagerProtocolProtos.VolumeList volumeList;
+  private OmVolumeArgs omVolumeArgs;
+
+  public OMVolumeCreateResponse(OmVolumeArgs omVolumeArgs,
+      OzoneManagerProtocolProtos.VolumeList volumeList) {
+    this.omVolumeArgs = omVolumeArgs;
+    this.volumeList = volumeList;
+  }
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    String dbVolumeKey =
+        omMetadataManager.getVolumeKey(omVolumeArgs.getVolume());
+    String dbUserKey =
+        omMetadataManager.getUserKey(omVolumeArgs.getOwnerName());
+
+    omMetadataManager.getVolumeTable().putWithBatch(batchOperation, dbVolumeKey,
+        omVolumeArgs);
+    omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
+        volumeList);
+  }
+
+  public OzoneManagerProtocolProtos.VolumeList getVolumeList() {
+    return volumeList;
+  }
+
+  public OmVolumeArgs getOmVolumeArgs() {
+    return omVolumeArgs;
+  }
+}
+

+ 59 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Response for CreateVolume request.
+ */
+public class OMVolumeDeleteResponse implements OMClientResponse {
+  private String volume;
+  private String owner;
+  private OzoneManagerProtocolProtos.VolumeList updatedVolumeList;
+
+  public OMVolumeDeleteResponse(String volume, String owner,
+      OzoneManagerProtocolProtos.VolumeList updatedVolumeList) {
+    this.volume = volume;
+    this.owner = owner;
+    this.updatedVolumeList = updatedVolumeList;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    String dbUserKey = omMetadataManager.getUserKey(owner);
+    OzoneManagerProtocolProtos.VolumeList volumeList =
+        updatedVolumeList;
+    if (updatedVolumeList.getVolumeNamesList().size() == 0) {
+      omMetadataManager.getUserTable().deleteWithBatch(batchOperation,
+          dbUserKey);
+    } else {
+      omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey,
+          volumeList);
+    }
+    omMetadataManager.getVolumeTable().deleteWithBatch(batchOperation,
+        omMetadataManager.getVolumeKey(volume));
+  }
+}
+

+ 24 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response;
+
+
+/**
+ * This package contains classes for the OM Responses.
+ */

+ 130 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java

@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * This class tests OzoneManagerDoubleBuffer implementation with
+ * dummy response class.
+ */
+public class TestOzoneManagerDoubleBufferWithDummyResponse {
+
+  private OMMetadataManager omMetadataManager;
+  private OzoneManagerDoubleBuffer doubleBuffer;
+  private AtomicLong trxId = new AtomicLong(0);
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws IOException {
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration.set(OZONE_METADATA_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager =
+        new OmMetadataManagerImpl(configuration);
+    doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager);
+  }
+
+  @After
+  public void stop() {
+    doubleBuffer.stop();
+  }
+
+  /**
+   * This tests add's 100 bucket creation responses to doubleBuffer, and
+   * check OM DB bucket table has 100 entries or not. In addition checks
+   * flushed transaction count is matching with expected count or not.
+   * @throws Exception
+   */
+  @Test(timeout = 300_000)
+  public void testDoubleBufferWithDummyResponse() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    int bucketCount = 100;
+    for (int i=0; i < bucketCount; i++) {
+      doubleBuffer.add(createDummyBucketResponse(volumeName,
+          UUID.randomUUID().toString()), trxId.incrementAndGet());
+    }
+    GenericTestUtils.waitFor(() ->
+            doubleBuffer.getFlushedTransactionCount() == bucketCount, 100,
+        60000);
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getBucketTable()) == (bucketCount));
+    Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
+  }
+
+  /**
+   * Create DummyBucketCreate response.
+   * @param volumeName
+   * @param bucketName
+   * @return OMDummyCreateBucketResponse
+   */
+  private OMDummyCreateBucketResponse createDummyBucketResponse(
+      String volumeName, String bucketName) {
+    OmBucketInfo omBucketInfo =
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName).setCreationTime(Time.now()).build();
+    return new OMDummyCreateBucketResponse(omBucketInfo);
+  }
+
+
+  /**
+   * DummyCreatedBucket Response class used in testing.
+   */
+  public static class OMDummyCreateBucketResponse implements OMClientResponse {
+    private final OmBucketInfo omBucketInfo;
+
+    public OMDummyCreateBucketResponse(OmBucketInfo omBucketInfo) {
+      this.omBucketInfo = omBucketInfo;
+    }
+
+    @Override
+    public void addToDBBatch(OMMetadataManager omMetadataManager,
+        BatchOperation batchOperation) throws IOException {
+      String dbBucketKey =
+          omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+              omBucketInfo.getBucketName());
+      omMetadataManager.getBucketTable().putWithBatch(batchOperation,
+          dbBucketKey, omBucketInfo);
+    }
+
+  }
+}

+ 409 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java

@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.response.OMVolumeCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.om.response.OMBucketCreateResponse;
+import org.apache.hadoop.ozone.om.response.OMBucketDeleteResponse;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.junit.Assert.fail;
+
+/**
+ * This class tests OzoneManagerDouble Buffer with actual OMResponse classes.
+ */
+public class TestOzoneManagerDoubleBufferWithOMResponse {
+
+  private OMMetadataManager omMetadataManager;
+  private OzoneManagerDoubleBuffer doubleBuffer;
+  private AtomicLong trxId = new AtomicLong(0);
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Before
+  public void setup() throws IOException  {
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration.set(OZONE_METADATA_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager =
+        new OmMetadataManagerImpl(configuration);
+    doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager);
+  }
+
+  @After
+  public void stop() {
+    doubleBuffer.stop();
+  }
+
+  /**
+   * This tests OzoneManagerDoubleBuffer implementation. It calls
+   * testDoubleBuffer with number of iterations to do transactions and
+   * number of buckets to be created in each iteration. It then
+   * verifies OM DB entries count is matching with total number of
+   * transactions or not.
+   * @throws Exception
+   */
+  @Test(timeout = 300_000)
+  public void testDoubleBuffer() throws Exception {
+    // This test checks whether count in tables are correct or not.
+    testDoubleBuffer(1, 10);
+    testDoubleBuffer(10, 100);
+    testDoubleBuffer(100, 100);
+    testDoubleBuffer(1000, 1000);
+  }
+
+  /**
+   * This test first creates a volume, and then does a mix of transactions
+   * like create/delete buckets and add them to double buffer. Then it
+   * verifies OM DB entries are matching with actual responses added to
+   * double buffer or not.
+   * @throws Exception
+   */
+  @Test
+  public void testDoubleBufferWithMixOfTransactions() throws Exception {
+    // This test checks count, data in table is correct or not.
+    Queue< OMBucketCreateResponse > bucketQueue =
+        new ConcurrentLinkedQueue<>();
+    Queue< OMBucketDeleteResponse > deleteBucketQueue =
+        new ConcurrentLinkedQueue<>();
+
+    String volumeName = UUID.randomUUID().toString();
+    OMVolumeCreateResponse omVolumeCreateResponse = createVolume(volumeName);
+    doubleBuffer.add(omVolumeCreateResponse, trxId.incrementAndGet());
+
+
+    int bucketCount = 10;
+
+    doMixTransactions(volumeName, 10, deleteBucketQueue, bucketQueue);
+
+    // As for every 2 transactions of create bucket we add deleted bucket.
+    final int deleteCount = 5;
+
+    // We are doing +1 for volume transaction.
+    GenericTestUtils.waitFor(() ->
+        doubleBuffer.getFlushedTransactionCount() ==
+            (bucketCount + deleteCount + 1), 100, 120000);
+
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getVolumeTable()) == 1);
+
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getBucketTable()) == 5);
+
+    // Now after this in our DB we should have 5 buckets and one volume
+
+    checkVolume(volumeName, omVolumeCreateResponse);
+
+    checkCreateBuckets(bucketQueue);
+
+    checkDeletedBuckets(deleteBucketQueue);
+  }
+
+  /**
+   * This test first creates a volume, and then does a mix of transactions
+   * like create/delete buckets in parallel and add to double buffer. Then it
+   * verifies OM DB entries are matching with actual responses added to
+   * double buffer or not.
+   * @throws Exception
+   */
+  @Test
+  public void testDoubleBufferWithMixOfTransactionsParallel() throws Exception {
+    // This test checks count, data in table is correct or not.
+
+    Queue< OMBucketCreateResponse > bucketQueue =
+        new ConcurrentLinkedQueue<>();
+    Queue< OMBucketDeleteResponse > deleteBucketQueue =
+        new ConcurrentLinkedQueue<>();
+
+    String volumeName1 = UUID.randomUUID().toString();
+    OMVolumeCreateResponse omVolumeCreateResponse1 =
+        createVolume(volumeName1);
+
+    String volumeName2 = UUID.randomUUID().toString();
+    OMVolumeCreateResponse omVolumeCreateResponse2 =
+        createVolume(volumeName2);
+
+    doubleBuffer.add(omVolumeCreateResponse1, trxId.incrementAndGet());
+
+    doubleBuffer.add(omVolumeCreateResponse2, trxId.incrementAndGet());
+
+    Daemon daemon1 = new Daemon(() -> doMixTransactions(volumeName1, 10,
+        deleteBucketQueue, bucketQueue));
+    Daemon daemon2 = new Daemon(() -> doMixTransactions(volumeName2, 10,
+        deleteBucketQueue, bucketQueue));
+
+    daemon1.start();
+    daemon2.start();
+
+    int bucketCount = 20;
+
+      // As for every 2 transactions of create bucket we add deleted bucket.
+    final int deleteCount = 10;
+
+    // We are doing +1 for volume transaction.
+    GenericTestUtils.waitFor(() -> doubleBuffer.getFlushedTransactionCount()
+            == (bucketCount + deleteCount + 2), 100, 120000);
+
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getVolumeTable()) == 2);
+
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getBucketTable()) == 10);
+
+    // Now after this in our DB we should have 5 buckets and one volume
+
+
+    checkVolume(volumeName1, omVolumeCreateResponse1);
+    checkVolume(volumeName2, omVolumeCreateResponse2);
+
+    checkCreateBuckets(bucketQueue);
+
+    checkDeletedBuckets(deleteBucketQueue);
+  }
+
+  /**
+   * This method add's a mix of createBucket/DeleteBucket responses to double
+   * buffer. Total number of responses added is specified by bucketCount.
+   * @param volumeName
+   * @param bucketCount
+   * @param deleteBucketQueue
+   * @param bucketQueue
+   */
+  private void doMixTransactions(String volumeName, int bucketCount,
+      Queue<OMBucketDeleteResponse> deleteBucketQueue,
+      Queue<OMBucketCreateResponse> bucketQueue) {
+    for (int i=0; i < bucketCount; i++) {
+      String bucketName = UUID.randomUUID().toString();
+      OMBucketCreateResponse omBucketCreateResponse = createBucket(volumeName,
+          bucketName);
+      doubleBuffer.add(omBucketCreateResponse, trxId.incrementAndGet());
+      // For every 2 transactions have a deleted bucket.
+      if (i % 2 == 0) {
+        OMBucketDeleteResponse omBucketDeleteResponse =
+            deleteBucket(volumeName, bucketName);
+        doubleBuffer.add(omBucketDeleteResponse, trxId.incrementAndGet());
+        deleteBucketQueue.add(omBucketDeleteResponse);
+      } else {
+        bucketQueue.add(omBucketCreateResponse);
+      }
+    }
+  }
+
+  /**
+   * Verifies volume table data is matching with actual response added to
+   * double buffer.
+   * @param volumeName
+   * @param omVolumeCreateResponse
+   * @throws Exception
+   */
+  private void checkVolume(String volumeName,
+      OMVolumeCreateResponse omVolumeCreateResponse) throws Exception {
+    OmVolumeArgs tableVolumeArgs = omMetadataManager.getVolumeTable().get(
+        omMetadataManager.getVolumeKey(volumeName));
+    Assert.assertTrue(tableVolumeArgs != null);
+
+    OmVolumeArgs omVolumeArgs = omVolumeCreateResponse.getOmVolumeArgs();
+
+    Assert.assertEquals(omVolumeArgs.getVolume(), tableVolumeArgs.getVolume());
+    Assert.assertEquals(omVolumeArgs.getAdminName(),
+        tableVolumeArgs.getAdminName());
+    Assert.assertEquals(omVolumeArgs.getOwnerName(),
+        tableVolumeArgs.getOwnerName());
+    Assert.assertEquals(omVolumeArgs.getCreationTime(),
+        tableVolumeArgs.getCreationTime());
+  }
+
+  /**
+   * Verifies bucket table data is matching with actual response added to
+   * double buffer.
+   * @param bucketQueue
+   */
+  private void checkCreateBuckets(Queue<OMBucketCreateResponse> bucketQueue) {
+    bucketQueue.forEach((omBucketCreateResponse) -> {
+      OmBucketInfo omBucketInfo = omBucketCreateResponse.getOmBucketInfo();
+      String bucket = omBucketInfo.getBucketName();
+      OmBucketInfo tableBucketInfo = null;
+      try {
+        tableBucketInfo =
+            omMetadataManager.getBucketTable().get(
+                omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(),
+                    bucket));
+      } catch (IOException ex) {
+        fail("testDoubleBufferWithMixOfTransactions failed");
+      }
+      Assert.assertNotNull(tableBucketInfo);
+
+      Assert.assertEquals(omBucketInfo.getVolumeName(),
+          tableBucketInfo.getVolumeName());
+      Assert.assertEquals(omBucketInfo.getBucketName(),
+          tableBucketInfo.getBucketName());
+      Assert.assertEquals(omBucketInfo.getCreationTime(),
+          tableBucketInfo.getCreationTime());
+    });
+  }
+
+  /**
+   * Verifies deleted bucket responses added to double buffer are actually
+   * removed from the OM DB or not.
+   * @param deleteBucketQueue
+   */
+  private void checkDeletedBuckets(Queue<OMBucketDeleteResponse>
+      deleteBucketQueue) {
+    deleteBucketQueue.forEach((omBucketDeleteResponse -> {
+      try {
+        Assert.assertNull(omMetadataManager.getBucketTable().get(
+            omMetadataManager.getBucketKey(
+                omBucketDeleteResponse.getVolumeName(),
+                omBucketDeleteResponse.getBucketName())));
+      } catch (IOException ex) {
+        fail("testDoubleBufferWithMixOfTransactions failed");
+      }
+    }));
+  }
+
+  /**
+   * Create bucketCount number of createBucket responses for each iteration.
+   * All these iterations are run in parallel. Then verify OM DB has correct
+   * number of entries or not.
+   * @param iterations
+   * @param bucketCount
+   * @throws Exception
+   */
+  public void testDoubleBuffer(int iterations, int bucketCount)
+      throws Exception {
+    try {
+      // Calling setup and stop here because this method is called from a
+      // single test multiple times.
+      setup();
+      for (int i = 0; i < iterations; i++) {
+        Daemon d1 = new Daemon(() ->
+            doTransactions(UUID.randomUUID().toString(), bucketCount));
+        d1.start();
+      }
+
+      // We are doing +1 for volume transaction.
+      GenericTestUtils.waitFor(() ->
+              doubleBuffer.getFlushedTransactionCount() ==
+                  (bucketCount + 1) * iterations, 100,
+          120000);
+
+      Assert.assertTrue(omMetadataManager.countRowsInTable(
+          omMetadataManager.getVolumeTable()) == iterations);
+
+      Assert.assertTrue(omMetadataManager.countRowsInTable(
+          omMetadataManager.getBucketTable()) == (bucketCount) * iterations);
+
+      Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
+    } finally {
+      stop();
+    }
+  }
+
+  /**
+   * This method adds bucketCount number of createBucket responses to double
+   * buffer.
+   * @param volumeName
+   * @param bucketCount
+   */
+  public void doTransactions(String volumeName, int bucketCount) {
+    doubleBuffer.add(createVolume(volumeName), trxId.incrementAndGet());
+    for (int i=0; i< bucketCount; i++) {
+      doubleBuffer.add(createBucket(volumeName, UUID.randomUUID().toString()),
+          trxId.incrementAndGet());
+      // For every 100 buckets creation adding 100ms delay
+
+      if (i % 100 == 0) {
+        try {
+          Thread.sleep(100);
+        } catch (Exception ex) {
+
+        }
+      }
+    }
+  }
+
+  /**
+   * Create OMVolumeCreateResponse for specified volume.
+   * @param volumeName
+   * @return OMVolumeCreateResponse
+   */
+  private OMVolumeCreateResponse createVolume(String volumeName) {
+    OmVolumeArgs omVolumeArgs =
+        OmVolumeArgs.newBuilder()
+            .setAdminName(UUID.randomUUID().toString())
+            .setOwnerName(UUID.randomUUID().toString())
+            .setVolume(volumeName)
+            .setCreationTime(Time.now()).build();
+
+    VolumeList volumeList = VolumeList.newBuilder()
+        .addVolumeNames(volumeName).build();
+    return new OMVolumeCreateResponse(omVolumeArgs, volumeList);
+  }
+
+  /**
+   * Create OMBucketCreateResponse for specified volume and bucket.
+   * @param volumeName
+   * @param bucketName
+   * @return OMBucketCreateResponse
+   */
+  private OMBucketCreateResponse createBucket(String volumeName,
+      String bucketName) {
+    OmBucketInfo omBucketInfo =
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName).setCreationTime(Time.now()).build();
+    return new OMBucketCreateResponse(omBucketInfo);
+  }
+
+  /**
+   * Create OMBucketDeleteResponse for specified volume and bucket.
+   * @param volumeName
+   * @param bucketName
+   * @return OMBucketDeleteResponse
+   */
+  private OMBucketDeleteResponse deleteBucket(String volumeName,
+      String bucketName) {
+    return new OMBucketDeleteResponse(volumeName, bucketName);
+  }
+
+
+}
+