浏览代码

HDFS-11361. Block Storage: add cache interface. Contributed by Chen Liang.

Anu Engineer 8 年之前
父节点
当前提交
23044c1db8
共有 14 个文件被更改,包括 1890 次插入5 次删除
  1. 31 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java
  2. 63 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
  3. 155 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
  4. 440 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java
  5. 119 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java
  6. 512 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java
  7. 52 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java
  8. 50 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java
  9. 180 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java
  10. 77 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java
  11. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java
  12. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java
  13. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java
  14. 157 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -35,6 +36,12 @@ public class Pipeline {
   private String containerName;
   private String leaderID;
   private Map<String, DatanodeID> datanodes;
+  /**
+   * Allows you to maintain private data on pipelines.
+   * This is not serialized via protobuf, just allows us to maintain some
+   * private data.
+   */
+  private byte[] data;
 
   /**
    * Constructs a new pipeline data structure.
@@ -44,6 +51,7 @@ public class Pipeline {
   public Pipeline(String leaderID) {
     this.leaderID = leaderID;
     datanodes = new TreeMap<>();
+    data = null;
   }
 
   /**
@@ -124,4 +132,27 @@ public class Pipeline {
   public void setContainerName(String containerName) {
     this.containerName = containerName;
   }
+
+  /**
+   * Set private data on pipeline.
+   * @param data -- private data.
+   */
+  public void setData(byte[] data) {
+    if (data != null) {
+      this.data = Arrays.copyOf(data, data.length);
+    }
+  }
+
+  /**
+   * Returns private data that is set on this pipeline.
+   *
+   * @return blob, the user can interpret it any way they like.
+   */
+  public byte[] getData() {
+    if (this.data != null) {
+      return Arrays.copyOf(this.data, this.data.length);
+    } else {
+      return null;
+    }
+  }
 }

+ 63 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java

@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.cblock;
 
+import static java.lang.Thread.NORM_PRIORITY;
+
 /**
  * This class contains constants for configuration keys used in CBlock.
  */
 public final class CBlockConfigKeys {
-  public static final String DFS_CBLOCK_ENABLED_KEY =
-      "dfs.cblock.enabled";
   public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY =
       "dfs.cblock.servicerpc-address";
   public static final String DFS_CBLOCK_SERVICERPC_PORT_KEY =
@@ -33,8 +33,6 @@ public final class CBlockConfigKeys {
       "dfs.cblock.servicerpc.hostname";
   public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT =
       "0.0.0.0";
-  public static final String DFS_CBLOCK_RPCSERVICE_IP_DEFAULT =
-      "0.0.0.0";
   public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT =
       DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT
           + ":" + DFS_CBLOCK_SERVICERPC_PORT_DEFAULT;
@@ -46,7 +44,7 @@ public final class CBlockConfigKeys {
   public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT =
       9811;
   public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT =
-      DFS_CBLOCK_RPCSERVICE_IP_DEFAULT
+      DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT
           + ":" + DFS_CBLOCK_JSCSI_PORT_DEFAULT;
 
 
@@ -69,6 +67,66 @@ public final class CBlockConfigKeys {
   public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT =
       "/tmp/cblock_levelDB.dat";
 
+
+  public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY =
+      "dfs.cblock.disk.cache.path";
+  public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT =
+      "/tmp/cblockCacheDB";
+  /**
+   * Setting this flag to true makes the block layer compute a sha256 hash of
+   * the data and log that information along with block ID. This is very
+   * useful for doing trace based simulation of various workloads. Since it is
+   * computing a hash for each block this could be expensive, hence default
+   * is false.
+   */
+  public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io";
+  public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false;
+
+  /**
+   * Cache size in 1000s of entries. 256 indicates 256 * 1024.
+   */
+  public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB =
+      "dfs.cblock.cache.cache.size.in.kb";
+  public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256;
+
+  /**
+   *  Minimum Number of threads that cache pool will use for background I/O.
+   */
+  public static final String DFS_CBLOCK_CACHE_CORE_POOL_SIZE =
+      "dfs.cblock.cache.core.pool.size";
+  public static final int DFS_CBLOCK_CACHE_CORE_POOL_SIZE_DEFAULT = 16;
+
+  /**
+   *  Maximum Number of threads that cache pool will use for background I/O.
+   */
+
+  public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE =
+      "dfs.cblock.cache.max.pool.size";
+  public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256;
+
+  /**
+   * Number of seconds to keep the Thread alive when it is idle.
+   */
+  public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS =
+      "dfs.cblock.cache.keep.alive.seconds";
+  public static final long DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT = 60;
+
+  /**
+   * Priority of cache flusher thread, affecting the relative performance of
+   * write and read.
+   */
+  public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY =
+      "dfs.cblock.cache.thread.priority";
+  public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT =
+      NORM_PRIORITY;
+
+  /**
+   * Block Buffer size in 1024 entries, 128 means 128 * 1024 blockIDs.
+   */
+  public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE =
+      "dfs.cblock.cache.block.buffer.size";
+  public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 128;
+
   private CBlockConfigKeys() {
 
   }

+ 155 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java

@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+
+/**
+ * The blockWriter task.
+ */
+public class BlockWriterTask implements Runnable {
+  private final LogicalBlock block;
+  private int tryCount;
+  private final ContainerCacheFlusher flusher;
+  private final String dbPath;
+  private final String fileName;
+  private static final String RETRY_LOG_PREFIX = "RetryLog";
+
+  /**
+   * Constructs a BlockWriterTask.
+   *
+   * @param block - Block Information.
+   * @param flusher - ContainerCacheFlusher.
+   */
+  public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher,
+      String dbPath, String fileName) {
+    this.block = block;
+    this.flusher = flusher;
+    this.dbPath = dbPath;
+    tryCount = 0;
+    this.fileName = fileName;
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    String containerName = null;
+    XceiverClientSpi client = null;
+    flusher.getLOG().debug(
+        "Writing block to remote. block ID: {}", block.getBlockID());
+    try {
+      incTryCount();
+      Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID());
+      client = flusher.getXceiverClientManager().acquireClient(pipeline);
+      byte[] keybuf = Longs.toByteArray(block.getBlockID());
+      byte[] data;
+      long startTime = Time.monotonicNow();
+      data = flusher.getCacheDB(this.dbPath).get(keybuf);
+      long endTime = Time.monotonicNow();
+      Preconditions.checkState(data.length > 0, "Block data is zero length");
+      startTime = Time.monotonicNow();
+      // BUG: fix the trace ID.
+      ContainerProtocolCalls.writeSmallFile(client, containerName,
+          Long.toString(block.getBlockID()), data, "");
+      endTime = Time.monotonicNow();
+      flusher.getTargetMetrics().updateContainerWriteLatency(
+          endTime - startTime);
+      flusher.getLOG().debug("Time taken for Write Small File : {} ms",
+          endTime - startTime);
+
+      flusher.incrementremoteIO();
+
+    } catch (IOException ex) {
+      flusher.getLOG().error("Writing of block failed, We have attempted " +
+              "to write this block {} times to the container {}.Trace ID:{}",
+          this.getTryCount(), containerName, "", ex);
+      writeRetryBlock(block);
+    } finally {
+      flusher.incFinishCount(fileName);
+      if(client != null) {
+        flusher.getXceiverClientManager().releaseClient(client);
+      }
+    }
+  }
+
+
+  private void writeRetryBlock(LogicalBlock currentBlock) {
+    boolean append = false;
+    String retryFileName =
+        String.format("%s.%d.%s", RETRY_LOG_PREFIX, currentBlock.getBlockID(),
+            Time.monotonicNow());
+    File logDir = new File(this.dbPath);
+    if (!logDir.exists() && !logDir.mkdirs()) {
+      flusher.getLOG().error(
+          "Unable to create the log directory, Crticial error cannot continue");
+      return;
+    }
+    String log = Paths.get(this.dbPath, retryFileName).toString();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
+    buffer.putLong(currentBlock.getBlockID());
+    try {
+      FileChannel channel = new FileOutputStream(log, append).getChannel();
+      channel.write(buffer);
+      channel.close();
+      flusher.processDirtyBlocks(this.dbPath, retryFileName);
+    } catch (IOException e) {
+      flusher.getLOG().error("Unable to write the retry block. Block ID: {}",
+          currentBlock.getBlockID(), e);
+    }
+  }
+
+  /**
+   * Increments the try count. This is done each time we try this block
+   * write to the container.
+   */
+  private void incTryCount() {
+    tryCount++;
+  }
+
+  /**
+   * Get the retry count.
+   *
+   * @return int
+   */
+  public int getTryCount() {
+    return tryCount;
+  }
+}

+ 440 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java

@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.jscsi.target.storage.IStorageModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_TRACE_IO_DEFAULT;
+
+/**
+ * The SCSI Target class for CBlockSCSIServer.
+ */
+final public class CBlockIStorageImpl implements IStorageModule {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(CBlockIStorageImpl.class);
+  private static final Logger TRACER =
+      LoggerFactory.getLogger("TraceIO");
+
+  private CacheModule cache;
+  private final long volumeSize;
+  private final int blockSize;
+  private final String userName;
+  private final String volumeName;
+  private final boolean traceEnabled;
+  private final Configuration conf;
+  private final ContainerCacheFlusher flusher;
+  private List<Pipeline> fullContainerList;
+
+  /**
+   * private: constructs a SCSI Target.
+   *
+   * @param config - config
+   * @param userName - Username
+   * @param volumeName - Name of the volume
+   * @param volumeSize - Size of the volume
+   * @param blockSize - Size of the block
+   * @param fullContainerList - Ordered list of containers that make up this
+   * volume.
+   * @param flusher - flusher which is used to flush data from
+   *                  level db cache to containers
+   * @throws IOException - Throws IOException.
+   */
+  private CBlockIStorageImpl(Configuration config, String userName,
+      String volumeName, long volumeSize, int blockSize,
+      List<Pipeline> fullContainerList, ContainerCacheFlusher flusher) {
+    this.conf = config;
+    this.userName = userName;
+    this.volumeName = volumeName;
+    this.volumeSize = volumeSize;
+    this.blockSize = blockSize;
+    this.fullContainerList = new ArrayList<>(fullContainerList);
+    this.flusher = flusher;
+    this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO,
+        DFS_CBLOCK_TRACE_IO_DEFAULT);
+  }
+
+  /**
+   * private: initialize the cache.
+   *
+   * @param xceiverClientManager - client manager that is used for creating new
+   * connections to containers.
+   * @param metrics  - target metrics to maintain metrics for target server
+   * @throws IOException - Throws IOException.
+   */
+  private void initCache(XceiverClientManager xceiverClientManager,
+      CBlockTargetMetrics metrics) throws IOException {
+    this.cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(conf)
+        .setVolumeName(this.volumeName)
+        .setUserName(this.userName)
+        .setPipelines(this.fullContainerList)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(blockSize)
+        .setVolumeSize(volumeSize)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    this.cache.start();
+  }
+
+  /**
+   * Gets a new builder for CBlockStorageImpl.
+   *
+   * @return builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Get Cache.
+   *
+   * @return - Cache
+   */
+  public CacheModule getCache() {
+    return cache;
+  }
+
+  /**
+   * Returns block size of this volume.
+   *
+   * @return int size of block for this volume.
+   */
+  @Override
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  /**
+   * Checks the index boundary of a block address.
+   *
+   * @param logicalBlockAddress the index of the first block of data to be read
+   * or written
+   * @param transferLengthInBlocks the total number of consecutive blocks about
+   * to be read or written
+   * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2
+   * indicates that LBA + transfer size is out of bounds.
+   */
+  @Override
+  public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) {
+    long sizeInBlocks = volumeSize / blockSize;
+    int res = 0;
+    if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) {
+      res = 1;
+    }
+
+    if (transferLengthInBlocks < 0 ||
+        logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) {
+      if (res == 0) {
+        res = 2;
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Number of blocks that make up this volume.
+   *
+   * @return long - count of blocks.
+   */
+  @Override
+  public long getSizeInBlocks() {
+    return volumeSize / blockSize;
+  }
+
+  /**
+   * Reads the number of bytes that can be read into the bytes buffer from the
+   * location indicated.
+   *
+   * @param bytes the array into which the data will be copied will be filled
+   * with data from storage
+   * @param storageIndex the position of the first byte to be copied
+   * @throws IOException
+   */
+  @Override
+  public void read(byte[] bytes, long storageIndex) throws IOException {
+    int startingIdxInBlock = (int) storageIndex % blockSize;
+    int idxInBytes = 0;
+    if (this.traceEnabled) {
+      TRACER.info("Task=ReadStart,length={},location={}",
+          bytes.length, storageIndex);
+    }
+    while (idxInBytes < bytes.length - 1) {
+      long blockId = (storageIndex + idxInBytes) / blockSize;
+      byte[] dataBytes;
+
+      try {
+        LogicalBlock block = this.cache.get(blockId);
+        dataBytes = block.getData().array();
+
+        if (this.traceEnabled) {
+          TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}",
+              blockId,
+              dataBytes.length,
+              dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null);
+        }
+      } catch (IOException e) {
+        // For an non-existing block cache.get will return a block with zero
+        // bytes filled. So any error here is a real error.
+        LOGGER.error("getting errors when reading data:" + e);
+        throw e;
+      }
+
+      int length = blockSize - startingIdxInBlock;
+      if (length > bytes.length - idxInBytes) {
+        length = bytes.length - idxInBytes;
+      }
+      if (dataBytes.length >= length) {
+        System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes,
+            length);
+      }
+      startingIdxInBlock = 0;
+      idxInBytes += length;
+    }
+    if (this.traceEnabled) {
+      TRACER.info("Task=ReadEnd,length={},location={},SHA={}",
+          bytes.length, storageIndex, DigestUtils.sha256Hex(bytes));
+    }
+  }
+
+  @Override
+  public void write(byte[] bytes, long storageIndex) throws IOException {
+    int startingIdxInBlock = (int) storageIndex % blockSize;
+    int idxInBytes = 0;
+    if (this.traceEnabled) {
+      TRACER.info("Task=WriteStart,length={},location={},SHA={}",
+          bytes.length, storageIndex,
+          bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null);
+    }
+
+    ByteBuffer dataByte = ByteBuffer.allocate(blockSize);
+    while (idxInBytes < bytes.length - 1) {
+      long blockId = (storageIndex + idxInBytes) / blockSize;
+      int length = blockSize - startingIdxInBlock;
+      if (length > bytes.length - idxInBytes) {
+        length = bytes.length - idxInBytes;
+      }
+      System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock,
+          length);
+      this.cache.put(blockId, dataByte.array());
+
+      if (this.traceEnabled) {
+        TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}",
+            blockId, dataByte.array().length,
+            dataByte.array().length > 0 ?
+                DigestUtils.sha256Hex(dataByte.array()) : null);
+      }
+      dataByte.clear();
+      startingIdxInBlock = 0;
+      idxInBytes += length;
+    }
+
+    if (this.traceEnabled) {
+      TRACER.info("Task=WriteEnd,length={},location={} ",
+          bytes.length, storageIndex);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      cache.close();
+    } catch (IllegalStateException ise) {
+      LOGGER.error("Can not close the storage {}", ise);
+      throw ise;
+    }
+  }
+
+  /**
+   * Builder class for CBlocklocalCache.
+   */
+  public static class Builder {
+    private String userName;
+    private String volumeName;
+    private long volumeSize;
+    private int blockSize;
+    private List<Pipeline> containerList;
+    private Configuration conf;
+    private XceiverClientManager clientManager;
+    private ContainerCacheFlusher flusher;
+    private CBlockTargetMetrics metrics;
+
+    /**
+     * Constructs a builder.
+     */
+    Builder() {
+
+    }
+
+    public Builder setFlusher(ContainerCacheFlusher cacheFlusher) {
+      this.flusher = cacheFlusher;
+      return this;
+    }
+
+    /**
+     * set config.
+     *
+     * @param config - config
+     * @return Builder
+     */
+    public Builder setConf(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    /**
+     * set user name.
+     *
+     * @param cblockUserName - user name
+     * @return Builder
+     */
+    public Builder setUserName(String cblockUserName) {
+      this.userName = cblockUserName;
+      return this;
+    }
+
+    /**
+     * set volume name.
+     *
+     * @param cblockVolumeName -- volume name
+     * @return Builder
+     */
+    public Builder setVolumeName(String cblockVolumeName) {
+      this.volumeName = cblockVolumeName;
+      return this;
+    }
+
+    /**
+     * set volume size.
+     *
+     * @param cblockVolumeSize -- set volume size.
+     * @return Builder
+     */
+    public Builder setVolumeSize(long cblockVolumeSize) {
+      this.volumeSize = cblockVolumeSize;
+      return this;
+    }
+
+    /**
+     * set block size.
+     *
+     * @param cblockBlockSize -- block size
+     * @return Builder
+     */
+    public Builder setBlockSize(int cblockBlockSize) {
+      this.blockSize = cblockBlockSize;
+      return this;
+    }
+
+    /**
+     * Set contianer list.
+     *
+     * @param cblockContainerList - set the pipeline list
+     * @return Builder
+     */
+    public Builder setContainerList(List<Pipeline> cblockContainerList) {
+      this.containerList = cblockContainerList;
+      return this;
+    }
+
+    /**
+     * Set client manager.
+     *
+     * @param xceiverClientManager -- sets the client manager.
+     * @return Builder
+     */
+    public Builder setClientManager(XceiverClientManager xceiverClientManager) {
+      this.clientManager = xceiverClientManager;
+      return this;
+    }
+
+    /**
+     * Set Cblock Target Metrics.
+     *
+     * @param targetMetrics -- sets the cblock target metrics
+     * @return Builder
+     */
+    public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+      this.metrics = targetMetrics;
+      return this;
+    }
+
+    /**
+     * Builds the CBlockStorageImpl.
+     *
+     * @return builds the CBlock Scsi Target.
+     */
+    public CBlockIStorageImpl build() throws IOException {
+      if (StringUtils.isBlank(userName)) {
+        throw new IllegalArgumentException("User name cannot be null or empty" +
+            ".");
+      }
+      if (StringUtils.isBlank(volumeName)) {
+        throw new IllegalArgumentException("Volume name cannot be null or " +
+            "empty");
+      }
+
+      if (volumeSize < 1) {
+        throw new IllegalArgumentException("Volume size cannot be negative or" +
+            " zero.");
+      }
+
+      if (blockSize < 1) {
+        throw new IllegalArgumentException("Block size cannot be negative or " +
+            "zero.");
+      }
+
+      if (containerList == null || containerList.size() == 0) {
+        throw new IllegalArgumentException("Container list cannot be null or " +
+            "empty");
+      }
+      if (clientManager == null) {
+        throw new IllegalArgumentException("Client manager cannot be null");
+      }
+      if (conf == null) {
+        throw new IllegalArgumentException("Configuration cannot be null");
+      }
+
+      if (flusher == null) {
+        throw new IllegalArgumentException("Flusher Cannot be null.");
+      }
+      CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, this.userName,
+          this.volumeName, this.volumeSize, this.blockSize, this.containerList,
+          this.flusher);
+      impl.initCache(this.clientManager, this.metrics);
+      return impl;
+    }
+  }
+}

+ 119 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * This class is for maintaining  the various Cblock Target statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ *
+ * This class maintains stats like cache hit and miss ratio
+ * as well as the latency time of read and write ops.
+ */
+public class CBlockTargetMetrics {
+  @Metric private MutableCounterLong numReadOps;
+  @Metric private MutableCounterLong numWriteOps;
+  @Metric private MutableCounterLong numReadCacheHits;
+  @Metric private MutableCounterLong numReadCacheMiss;
+  @Metric private MutableCounterLong numReadLostBlocks;
+
+  @Metric private MutableRate dbReadLatency;
+  @Metric private MutableRate containerReadLatency;
+
+  @Metric private MutableRate dbWriteLatency;
+  @Metric private MutableRate containerWriteLatency;
+
+  public CBlockTargetMetrics() {
+  }
+
+  public static CBlockTargetMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register("CBlockTargetMetrics",
+        "CBlock Target Metrics",
+        new CBlockTargetMetrics());
+  }
+
+  public void incNumReadOps() {
+    numReadOps.incr();
+  }
+
+  public void incNumWriteOps() {
+    numWriteOps.incr();
+  }
+
+  public void incNumReadCacheHits() {
+    numReadCacheHits.incr();
+  }
+
+  public void incNumReadCacheMiss() {
+    numReadCacheMiss.incr();
+  }
+
+  public void incNumReadLostBlocks() {
+    numReadLostBlocks.incr();
+  }
+
+  public void updateDBReadLatency(long latency) {
+    dbReadLatency.add(latency);
+  }
+
+  public void updateContainerReadLatency(long latency) {
+    containerReadLatency.add(latency);
+  }
+
+  public void updateDBWriteLatency(long latency) {
+    dbWriteLatency.add(latency);
+  }
+
+  public void updateContainerWriteLatency(long latency) {
+    containerWriteLatency.add(latency);
+  }
+
+  @VisibleForTesting
+  public long getNumReadOps() {
+    return numReadOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteOps() {
+    return numWriteOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadCacheHits() {
+    return numReadCacheHits.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadCacheMiss() {
+    return numReadCacheMiss.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadLostBlocks() {
+    return numReadLostBlocks.value();
+  }
+}

+ 512 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java

@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_CORE_POOL_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_CORE_POOL_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_MAX_POOL_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_THREAD_PRIORITY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+
+/**
+ * Class that writes to remote containers.
+ */
+public class ContainerCacheFlusher implements Runnable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerCacheFlusher.class);
+  private final LinkedBlockingQueue<Message> messageQueue;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  private final ArrayBlockingQueue<Runnable> workQueue;
+  private final ConcurrentMap<String, RefCountedDB> dbMap;
+  private final ByteBuffer blockIDBuffer;
+  private final ConcurrentMap<String, Pipeline[]> pipelineMap;
+  private final AtomicLong remoteIO;
+  private final XceiverClientManager xceiverClientManager;
+  private final CBlockTargetMetrics metrics;
+  private AtomicBoolean shutdown;
+
+  private final ConcurrentMap<String, FinishCounter> finishCountMap;
+
+  /**
+   * Constructs the writers to remote queue.
+   */
+  public ContainerCacheFlusher(Configuration config,
+      XceiverClientManager xceiverClientManager,
+      CBlockTargetMetrics metrics) {
+    int queueSize = config.getInt(DFS_CBLOCK_CACHE_QUEUE_SIZE_KB,
+        DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT) * 1024;
+    int corePoolSize = config.getInt(DFS_CBLOCK_CACHE_CORE_POOL_SIZE,
+        DFS_CBLOCK_CACHE_CORE_POOL_SIZE_DEFAULT);
+    int maxPoolSize = config.getInt(DFS_CBLOCK_CACHE_MAX_POOL_SIZE,
+        DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT);
+    long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS,
+        DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT);
+    int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
+        DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
+    int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * 1024;
+
+    LOG.info("Cache: Core Pool Size: {}", corePoolSize);
+    LOG.info("Cache: Keep Alive: {}", keepAlive);
+    LOG.info("Cache: Max Pool Size: {}", maxPoolSize);
+    LOG.info("Cache: Thread Pri: {}", threadPri);
+    LOG.info("Cache: BlockBuffer Size: {}", blockBufferSize);
+
+    shutdown = new AtomicBoolean(false);
+    messageQueue = new LinkedBlockingQueue<>();
+    workQueue = new ArrayBlockingQueue<>(queueSize, true);
+
+    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Cache Block Writer Thread #%d")
+        .setDaemon(true)
+        .setPriority(threadPri)
+        .build();
+    threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
+        keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
+        new ThreadPoolExecutor.AbortPolicy());
+    threadPoolExecutor.prestartAllCoreThreads();
+
+    dbMap = new ConcurrentHashMap<>();
+    pipelineMap = new ConcurrentHashMap<>();
+    blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
+    this.xceiverClientManager = xceiverClientManager;
+    this.metrics = metrics;
+    this.remoteIO = new AtomicLong();
+
+    this.finishCountMap = new ConcurrentHashMap<>();
+    checkExisitingDirtyLog(config);
+  }
+
+  private void checkExisitingDirtyLog(Configuration config) {
+    File dbPath = Paths.get(config.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+        DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT)).toFile();
+    if (!dbPath.exists()) {
+      LOG.info("No existing dirty log found at {}", dbPath);
+      return;
+    }
+    LOG.info("Need to check and requeue existing dirty log {}", dbPath);
+    HashMap<String, ArrayList<String>> allFiles = new HashMap<>();
+    traverse(dbPath, allFiles);
+    for (Map.Entry<String, ArrayList<String>> entry : allFiles.entrySet()) {
+      String parentPath = entry.getKey();
+      for (String fileName : entry.getValue()) {
+        LOG.info("found this {} with {}", parentPath, fileName);
+        processDirtyBlocks(parentPath, fileName);
+      }
+    }
+  }
+
+  private void traverse(File path, HashMap<String, ArrayList<String>> files) {
+    if (path.isFile()) {
+      if (path.getName().startsWith("DirtyLog")) {
+        LOG.debug("found this {} with {}", path.getParent(), path.getName());
+        if (!files.containsKey(path.getParent())) {
+          files.put(path.getParent(), new ArrayList<>());
+        }
+        files.get(path.getParent()).add(path.getName());
+      }
+    } else {
+      File[] listFiles = path.listFiles();
+      if (listFiles != null) {
+        for (File subPath : listFiles) {
+          traverse(subPath, files);
+        }
+      }
+    }
+  }
+
+  /**
+   * Gets the CBlockTargetMetrics.
+   *
+   * @return CBlockTargetMetrics
+   */
+  public CBlockTargetMetrics getTargetMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Gets the  getXceiverClientManager.
+   *
+   * @return XceiverClientManager
+   */
+  public XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  /**
+   * Shutdown this instance.
+   */
+  public void shutdown() {
+    this.shutdown.set(true);
+    threadPoolExecutor.shutdown();
+  }
+
+  public long incrementremoteIO() {
+    return remoteIO.incrementAndGet();
+  }
+
+  /**
+   * Processes a block cache file and queues those blocks for the remote I/O.
+   *
+   * @param dbPath - Location where the DB can be found.
+   * @param fileName - Block Cache File Name
+   */
+  public void processDirtyBlocks(String dbPath, String fileName) {
+    LOG.info("Adding {}/{} to queue. Queue Length: {}", dbPath, fileName,
+        messageQueue.size());
+    this.messageQueue.add(new Message(dbPath, fileName));
+  }
+
+  public Logger getLOG() {
+    return LOG;
+  }
+
+  /**
+   * Opens a DB if needed or returns a handle to an already open DB.
+   *
+   * @param dbPath -- dbPath
+   * @param cacheSize - cacheSize
+   * @return the levelDB on the given path.
+   * @throws IOException
+   */
+  public synchronized LevelDBStore openDB(String dbPath, int cacheSize)
+      throws IOException {
+    if (dbMap.containsKey(dbPath)) {
+      RefCountedDB refDB = dbMap.get(dbPath);
+      refDB.open();
+      return refDB.db;
+    } else {
+      Options options = new Options();
+      options.cacheSize(cacheSize * (1024L * 1024L));
+      options.createIfMissing(true);
+      LevelDBStore cacheDB = new LevelDBStore(
+          new File(getDBFileName(dbPath)), options);
+      RefCountedDB refDB = new RefCountedDB(dbPath, cacheDB);
+      dbMap.put(dbPath, refDB);
+      return cacheDB;
+    }
+  }
+
+  /**
+   * Updates the contianer map. This data never changes so we will update this
+   * during restarts and it should not hurt us.
+   *
+   * @param dbPath - DbPath
+   * @param containerList - Contianer List.
+   */
+  public void register(String dbPath, Pipeline[] containerList) {
+    pipelineMap.put(dbPath, containerList);
+  }
+
+  private String getDBFileName(String dbPath) {
+    return dbPath + ".db";
+  }
+
+  LevelDBStore getCacheDB(String dbPath) {
+    return dbMap.get(dbPath).db;
+  }
+
+  /**
+   * Close the DB if we don't have any outstanding refrences.
+   *
+   * @param dbPath - dbPath
+   * @throws IOException
+   */
+  public synchronized void closeDB(String dbPath) throws IOException {
+    if (dbMap.containsKey(dbPath)) {
+      RefCountedDB refDB = dbMap.get(dbPath);
+      int count = refDB.close();
+      if (count == 0) {
+        dbMap.remove(dbPath);
+      }
+    }
+  }
+
+  Pipeline getPipeline(String dbPath, long blockId) {
+    Pipeline[] containerList = pipelineMap.get(dbPath);
+    Preconditions.checkNotNull(containerList);
+    int containerIdx = (int) blockId % containerList.length;
+    long cBlockIndex =
+        Longs.fromByteArray(containerList[containerIdx].getData());
+    if (cBlockIndex > 0) {
+      // This catches the case when we get a wrong container in the ordering
+      // of the containers.
+      Preconditions.checkState(containerIdx % cBlockIndex == 0,
+          "The container ID computed should match with the container index " +
+              "returned from cBlock Server.");
+    }
+    return containerList[containerIdx];
+  }
+
+  public void incFinishCount(String fileName) {
+    if (!finishCountMap.containsKey(fileName)) {
+      LOG.error("No record for such file:" + fileName);
+      return;
+    }
+    finishCountMap.get(fileName).incCount();
+    if (finishCountMap.get(fileName).isFileDeleted()) {
+      finishCountMap.remove(fileName);
+    }
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    while (!this.shutdown.get()) {
+      try {
+        Message message = messageQueue.take();
+        LOG.debug("Got message to process -- DB Path : {} , FileName; {}",
+            message.getDbPath(), message.getFileName());
+        String fullPath = Paths.get(message.getDbPath(),
+            message.getFileName()).toString();
+        ReadableByteChannel fileChannel = new FileInputStream(fullPath)
+            .getChannel();
+        // TODO: We can batch and unique the IOs here. First getting the code
+        // to work, we will add those later.
+        int bytesRead = fileChannel.read(blockIDBuffer);
+        LOG.debug("Read blockID log of size: {} position {} remaining {}",
+            bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining());
+        // current position of in the buffer in bytes, divided by number of
+        // bytes per long (which is calculated by number of bits per long
+        // divided by number of bits per byte) gives the number of blocks
+        int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
+        if (finishCountMap.containsKey(message.getFileName())) {
+          // In theory this should never happen. But if it happened,
+          // we need to know it...
+          LOG.error("Adding DirtyLog file again {} current count {} new {}",
+              message.getFileName(),
+              finishCountMap.get(message.getFileName()).expectedCount,
+              blockCount);
+        }
+        finishCountMap.put(message.getFileName(),
+            new FinishCounter(blockCount, message.getDbPath(),
+                message.getFileName()));
+        // should be flip instead of rewind, because we also need to make sure
+        // the end position is correct.
+        blockIDBuffer.flip();
+        LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
+            blockCount);
+        while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
+          long blockID = blockIDBuffer.getLong();
+          LogicalBlock block = new DiskBlock(blockID, null, false);
+          BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,
+              message.getDbPath(), message.getFileName());
+          threadPoolExecutor.submit(blockWriterTask);
+        }
+        blockIDBuffer.clear();
+      } catch (InterruptedException e) {
+        LOG.info("ContainerCacheFlusher is interrupted.", e);
+      } catch (FileNotFoundException e) {
+        LOG.error("Unable to find the dirty blocks file. This will cause " +
+            "data errors. Please stop using this volume.", e);
+      } catch (IOException e) {
+        LOG.error("Unable to read the dirty blocks file. This will cause " +
+            "data errors. Please stop using this volume.", e);
+      } catch (Exception e) {
+        LOG.error("Generic exception.", e);
+      }
+    }
+    LOG.info("Exiting flusher");
+  }
+
+  /**
+   * Keeps a Reference counted DB that we close only when the total Reference
+   * has gone to zero.
+   */
+  private static class RefCountedDB {
+    private LevelDBStore db;
+    private AtomicInteger refcount;
+    private String dbPath;
+
+    /**
+     * RefCountedDB DB ctor.
+     *
+     * @param dbPath - DB path.
+     * @param db - LevelDBStore db
+     */
+    RefCountedDB(String dbPath, LevelDBStore db) {
+      this.db = db;
+      this.refcount = new AtomicInteger(1);
+      this.dbPath = dbPath;
+    }
+
+    /**
+     * close the DB if possible.
+     */
+    public int close() throws IOException {
+      int count = this.refcount.decrementAndGet();
+      if (count == 0) {
+        LOG.info("Closing the LevelDB. {} ", this.dbPath);
+        db.close();
+      }
+      return count;
+    }
+
+    public void open() {
+      this.refcount.incrementAndGet();
+    }
+  }
+
+  /**
+   * The message held in processing queue.
+   */
+  private static class Message {
+    private String dbPath;
+    private String fileName;
+
+    /**
+     * A message that holds the info about which path dirty blocks log and
+     * which path contains db.
+     *
+     * @param dbPath
+     * @param fileName
+     */
+    Message(String dbPath, String fileName) {
+      this.dbPath = dbPath;
+      this.fileName = fileName;
+    }
+
+    public String getDbPath() {
+      return dbPath;
+    }
+
+    public void setDbPath(String dbPath) {
+      this.dbPath = dbPath;
+    }
+
+    public String getFileName() {
+      return fileName;
+    }
+
+    public void setFileName(String fileName) {
+      this.fileName = fileName;
+    }
+  }
+
+  private static class FinishCounter {
+    private final long expectedCount;
+    private final String dbPath;
+    private final String dirtyLogPath;
+    private final AtomicLong currentCount;
+    private AtomicBoolean fileDeleted;
+
+    FinishCounter(long expectedCount, String dbPath,
+        String dirtyLogPath) {
+      this.expectedCount = expectedCount;
+      this.dbPath = dbPath;
+      this.dirtyLogPath = dirtyLogPath;
+      this.currentCount = new AtomicLong(0);
+      this.fileDeleted = new AtomicBoolean(false);
+    }
+
+    public boolean isFileDeleted() {
+      return fileDeleted.get();
+    }
+
+    public void incCount() {
+      long count = this.currentCount.incrementAndGet();
+      if (count >= expectedCount) {
+        String filePath = String.format("%s/%s", dbPath, dirtyLogPath);
+        LOG.debug(
+            "Deleting {} with count {} {}", filePath, count, expectedCount);
+        try {
+          Path path = Paths.get(filePath);
+          Files.delete(path);
+          // the following part tries to remove the directory if it is empty
+          // but not sufficient, because the .db directory still exists....
+          // TODO how to handle the .db directory?
+          /*Path parent = path.getParent();
+          if (parent.toFile().listFiles().length == 0) {
+            Files.delete(parent);
+          }*/
+          fileDeleted.set(true);
+        } catch (IOException e) {
+          LOG.error(
+              "Error deleting dirty log file {} {}", filePath, e.toString());
+        }
+      }
+    }
+  }
+}

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache;
+
+import java.io.IOException;
+
+/**
+ * Defines the interface for cache implementations. The cache will be called
+ * by cblock storage module when it performs IO operations.
+ */
+public interface CacheModule {
+  /**
+   * check if the key is cached, if yes, returned the cached object.
+   * otherwise, load from data source. Then put it into cache.
+   *
+   * @param blockID
+   * @return the target block.
+   */
+  LogicalBlock get(long blockID) throws IOException;
+
+  /**
+   * put the value of the key into cache.
+   * @param blockID
+   * @param value
+   */
+  void put(long blockID, byte[] value) throws IOException;
+
+  void flush() throws IOException;
+
+  void start() throws IOException;
+
+  void stop() throws IOException;
+
+  void close() throws IOException;
+
+  boolean isDirtyCache();
+}

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Logical Block is the data structure that we write to the cache,
+ * the key and data gets written to remote contianers. Rest is used for
+ * book keeping for the cache.
+ */
+public interface LogicalBlock {
+  /**
+   * Returns the data stream of this block.
+   * @return - ByteBuffer
+   */
+  ByteBuffer getData();
+
+  /**
+   * Frees the byte buffer since we don't need it any more.
+   */
+  void clearData();
+
+  /**
+   * Returns the Block ID for this Block.
+   * @return long - BlockID
+   */
+  long getBlockID();
+
+  /**
+   * Flag that tells us if this block has been persisted to container.
+   * @return whether this block is now persistent
+   */
+  boolean isPersisted();
+}

+ 180 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java

@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * A local cache used by the CBlock ISCSI server. This class is enabled or
+ * disabled via config settings.
+ *
+ * TODO : currently, this class is a just a place holder.
+ */
+final public class CBlockLocalCache implements CacheModule {
+
+  private CBlockLocalCache() {
+  }
+
+  @Override
+  public LogicalBlock get(long blockID) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void put(long blockID, byte[] data) throws IOException {
+  }
+
+  @Override
+  public void flush() throws IOException {
+
+  }
+
+  @Override
+  public void start() throws IOException {
+
+  }
+
+  @Override
+  public void stop() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public boolean isDirtyCache() {
+    return false;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for CBlocklocalCache.
+   */
+  public static class Builder {
+
+    /**
+     * Sets the Config to be used by this cache.
+     *
+     * @param configuration - Config
+     * @return Builder
+     */
+    public Builder setConfiguration(Configuration configuration) {
+      return this;
+    }
+
+    /**
+     * Sets the user name who is the owner of this volume.
+     *
+     * @param userName - name of the owner, please note this is not the current
+     * user name.
+     * @return - Builder
+     */
+    public Builder setUserName(String userName) {
+      return this;
+    }
+
+    /**
+     * Sets the VolumeName.
+     *
+     * @param volumeName - Name of the volume
+     * @return Builder
+     */
+    public Builder setVolumeName(String volumeName) {
+      return this;
+    }
+
+    /**
+     * Sets the Pipelines that form this volume.
+     *
+     * @param pipelines - list of pipelines
+     * @return Builder
+     */
+    public Builder setPipelines(List<Pipeline> pipelines) {
+      return this;
+    }
+
+    /**
+     * Sets the Client Manager that manages the communication with containers.
+     *
+     * @param clientManager - clientManager.
+     * @return - Builder
+     */
+    public Builder setClientManager(XceiverClientManager clientManager) {
+      return this;
+    }
+
+    /**
+     * Sets the block size -- Typical sizes are 4KB, 8KB etc.
+     *
+     * @param blockSize - BlockSize.
+     * @return - Builder
+     */
+    public Builder setBlockSize(int blockSize) {
+      return this;
+    }
+
+    /**
+     * Sets the volumeSize.
+     *
+     * @param volumeSize - VolumeSize
+     * @return - Builder
+     */
+    public Builder setVolumeSize(long volumeSize) {
+      return this;
+    }
+
+    /**
+     * Set flusher.
+     * @param flusher - cache Flusher
+     * @return Builder.
+     */
+    public Builder setFlusher(ContainerCacheFlusher flusher) {
+      return this;
+    }
+
+    /**
+     * Sets the cblock Metrics.
+     *
+     * @param targetMetrics - CBlock Target Metrics
+     * @return - Builder
+     */
+    public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+      return this;
+    }
+
+    public CBlockLocalCache build() throws IOException {
+      return new CBlockLocalCache();
+    }
+  }
+}

+ 77 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import java.nio.ByteBuffer;
+
+/**
+ * Impl class for LogicalBlock.
+ */
+public class DiskBlock implements LogicalBlock {
+  private ByteBuffer data;
+  private long blockID;
+  private boolean persisted;
+
+  /**
+   * Constructs a DiskBlock Class from the following params.
+   * @param blockID - 64-bit block ID
+   * @param data - Byte Array
+   * @param persisted - Flag which tells us if this is persisted to remote
+   */
+  public DiskBlock(long blockID, byte[] data, boolean persisted) {
+    if (data !=null) {
+      this.data = ByteBuffer.wrap(data);
+    }
+    this.blockID = blockID;
+    this.persisted = persisted;
+  }
+
+  @Override
+  public ByteBuffer getData() {
+    return data;
+  }
+
+  /**
+   * Frees the byte buffer since we don't need it any more.
+   */
+  @Override
+  public void clearData() {
+    data.clear();
+  }
+
+  @Override
+  public long getBlockID() {
+    return blockID;
+  }
+
+  @Override
+  public boolean isPersisted() {
+    return persisted;
+  }
+
+  /**
+   * Sets the value of persisted.
+   * @param value - True if this has been persisted to container, false
+   * otherwise.
+   */
+  public void setPersisted(boolean value) {
+    persisted = value;
+  }
+
+}

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache;

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper;

+ 157 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java

@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+
+/**
+ * This class tests the cblock storage layer.
+ */
+public class TestStorageImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStorageImpl.class);
+  private final static long GB = 1024 * 1024 * 1024;
+  private final static int KB = 1024;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration config;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    URL p = config.getClass().getResource("");
+    String path = p.getPath().concat(
+        TestOzoneContainer.class.getSimpleName());
+    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    cluster = new MiniOzoneCluster.Builder(config)
+        .numDataNodes(1).setHandlerType("distributed").build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(config);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+  }
+
+  /**
+   * getContainerPipelines creates a set of containers and returns the
+   * Pipelines that define those containers.
+   *
+   * @param count - Number of containers to create.
+   * @return - List of Pipelines.
+   * @throws IOException
+   */
+  private List<Pipeline> getContainerPipeline(int count) throws IOException {
+    List<Pipeline> containerPipelines = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+      String containerName = "container" + RandomStringUtils.randomNumeric(10);
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(containerName);
+      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+      ContainerProtocolCalls.createContainer(client, traceID);
+      // This step is needed since we set private data on pipelines, when we
+      // read the list from CBlockServer. So we mimic that action here.
+      pipeline.setData(Longs.toByteArray(x));
+      containerPipelines.add(pipeline);
+    }
+    return containerPipelines;
+  }
+
+  @Test
+  public void testStorageImplBasicReadWrite() throws Exception {
+    OzoneConfiguration oConfig = new OzoneConfiguration();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    long volumeSize = 50L * (1024L * 1024L * 1024L);
+    int blockSize = 4096;
+    byte[] data =
+        RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
+            .getBytes(StandardCharsets.UTF_8);
+    String hash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig,
+        xceiverClientManager, metrics);
+    CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
+        .setUserName(userName)
+        .setVolumeName(volumeName)
+        .setVolumeSize(volumeSize)
+        .setBlockSize(blockSize)
+        .setContainerList(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setConf(oConfig)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    ozoneStore.write(data, 0);
+
+    // Currently, local cache is a placeholder and does not actually handle
+    // read and write. So the below write is guaranteed to fail. After
+    // CBlockLocalCache is properly implemented, we should uncomment the
+    // following lines
+    // TODO uncomment the following.
+
+    //byte[] newData = new byte[10 * 1024 * 1024];
+    //ozoneStore.read(newData, 0);
+    //String newHash = DigestUtils.sha256Hex(newData);
+    //Assert.assertEquals("hashes don't match.", hash, newHash);
+    GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
+        100, 20 *
+            1000);
+
+    ozoneStore.close();
+  }
+}