瀏覽代碼

HDFS-11537. Block Storage : add cache layer. Contributed by Chen Liang.

Xiaoyu Yao 8 年之前
父節點
當前提交
17a6e62629

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java

@@ -82,6 +82,11 @@ public final class CBlockConfigKeys {
   public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io";
   public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false;
 
+  public static final String DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO =
+      "dfs.cblock.short.circuit.io";
+  public static final boolean DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT =
+      false;
+
   /**
    * Cache size in 1000s of entries. 256 indicates 256 * 1024.
    */

+ 237 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java

@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
+
+/**
+ * A Queue that is used to write blocks asynchronously to the container.
+ */
+public class AsyncBlockWriter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AsyncBlockWriter.class);
+
+  /**
+   * Right now we have a single buffer and we block when we write it to
+   * the file.
+   */
+  private final ByteBuffer blockIDBuffer;
+
+  /**
+   * XceiverClientManager is used to get client connections to a set of
+   * machines.
+   */
+  private final XceiverClientManager xceiverClientManager;
+
+  /**
+   * This lock is used as a signal to re-queuing thread. The requeue thread
+   * wakes up as soon as it is signaled some blocks are in the retry queue.
+   * We try really aggressively since this new block will automatically move
+   * to the end of the queue.
+   * <p>
+   * In the event a container is unavailable for a long time, we can either
+   * fail all writes or remap and let the writes succeed. The easier
+   * semantics is to fail the volume until the container is recovered by SCM.
+   */
+  private final Lock lock;
+  private final Condition notEmpty;
+  /**
+   * The cache this writer is operating against.
+   */
+  private final CBlockLocalCache parentCache;
+  private final int blockBufferSize;
+  private final static String DIRTY_LOG_PREFIX = "DirtyLog";
+  private AtomicLong localIoCount;
+
+  /**
+   * Constructs an Async Block Writer.
+   *
+   * @param config - Config
+   * @param cache - Parent Cache for this writer
+   */
+  public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) {
+
+    Preconditions.checkNotNull(cache, "Cache cannot be null.");
+    Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
+    localIoCount = new AtomicLong();
+    blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
+        DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * 1024;
+    LOG.info("Cache: Block Size: {}", blockBufferSize);
+    lock = new ReentrantLock();
+    notEmpty = lock.newCondition();
+    parentCache = cache;
+    xceiverClientManager = cache.getClientManager();
+    blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
+  }
+
+  /**
+   * Return the log to write to.
+   *
+   * @return Logger.
+   */
+  public static Logger getLOG() {
+    return LOG;
+  }
+
+  /**
+   * Get the CacheDB.
+   *
+   * @return LevelDB Handle
+   */
+  LevelDBStore getCacheDB() {
+    return parentCache.getCacheDB();
+  }
+
+  /**
+   * Returns the client manager.
+   *
+   * @return XceiverClientManager
+   */
+  XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  /**
+   * Incs the localIoPacket Count that has gone into this device.
+   */
+  public long incrementLocalIO() {
+    return localIoCount.incrementAndGet();
+  }
+
+  /**
+   * Return the local io counts to this device.
+   * @return the count of io
+   */
+  public long getLocalIOCount() {
+    return localIoCount.get();
+  }
+
+  /**
+   * Writes a block to LevelDB store and queues a work item for the system to
+   * sync the block to containers.
+   *
+   * @param block - Logical Block
+   */
+  public void writeBlock(LogicalBlock block) throws IOException {
+    byte[] keybuf = Longs.toByteArray(block.getBlockID());
+    if (parentCache.isShortCircuitIOEnabled()) {
+      long startTime = Time.monotonicNow();
+      getCacheDB().put(keybuf, block.getData().array());
+      incrementLocalIO();
+      long endTime = Time.monotonicNow();
+      parentCache.getTargetMetrics().updateDBWriteLatency(
+                                                    endTime - startTime);
+      if (parentCache.isTraceEnabled()) {
+        String datahash = DigestUtils.sha256Hex(block.getData().array());
+        parentCache.getTracer().info(
+            "Task=WriterTaskDBPut,BlockID={},Time={},SHA={}",
+            block.getBlockID(), endTime - startTime, datahash);
+      }
+      block.clearData();
+    } else {
+      // TODO : Support Direct I/O
+      LOG.error("Non-Cache I/O is not supported at this point of time.");
+      throw new IllegalStateException("Cache is required and cannot be " +
+          "disabled now.");
+    }
+    if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
+      long startTime = Time.monotonicNow();
+      blockIDBuffer.flip();
+      writeBlockBufferToFile(blockIDBuffer);
+      blockIDBuffer.clear();
+      long endTime = Time.monotonicNow();
+      if (parentCache.isTraceEnabled()) {
+        parentCache.getTracer().info(
+            "Task=DirtyBlockLogWrite,Time={}", endTime - startTime);
+      }
+    }
+    blockIDBuffer.putLong(block.getBlockID());
+  }
+
+  /**
+   * Write Block Buffer to file.
+   *
+   * @param blockID - ByteBuffer
+   * @throws IOException
+   */
+  private void writeBlockBufferToFile(ByteBuffer blockID)
+      throws IOException {
+    boolean append = false;
+    String fileName =
+        String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
+    File logDir = new File(parentCache.getDbPath().toString());
+    if (!logDir.exists() && !logDir.mkdirs()) {
+      LOG.error("Unable to create the log directory, Critical error cannot " +
+          "continue. Log Dir : {}", logDir);
+      throw new IllegalStateException("Cache Directory create failed, Cannot " +
+          "continue. Log Dir: {}" + logDir);
+    }
+    String log = Paths.get(parentCache.getDbPath().toString(), fileName)
+        .toString();
+
+    try (FileChannel channel = new FileOutputStream(log, append).getChannel()) {
+      channel.write(blockID);
+    }
+    blockID.clear();
+    parentCache.processDirtyMessage(fileName);
+  }
+
+  /**
+   * Shutdown by writing any pending I/O to dirtylog buffer.
+   */
+  public void shutdown() {
+    try {
+      writeBlockBufferToFile(this.blockIDBuffer);
+    } catch (IOException e) {
+      LOG.error("Unable to sync the Block map to disk -- This might cause a " +
+          "data loss or corruption");
+    }
+  }
+  /**
+   * Returns tracer.
+   *
+   * @return Tracer
+   */
+  Logger getTracer() {
+    return parentCache.getTracer();
+  }
+
+}

+ 450 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java

@@ -17,36 +17,278 @@
  */
 package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
 
-import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
 import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule;
 import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
+import java.util.UUID;
 
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys
+    .DFS_CBLOCK_TRACE_IO_DEFAULT;
 
 /**
  * A local cache used by the CBlock ISCSI server. This class is enabled or
  * disabled via config settings.
- *
- * TODO : currently, this class is a just a place holder.
  */
-final public class CBlockLocalCache implements CacheModule {
+public class CBlockLocalCache implements CacheModule {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CBlockLocalCache.class);
+  private static final Logger TRACER =
+      LoggerFactory.getLogger("TraceIO");
+
+  private final Configuration conf;
+  /**
+   * LevelDB cache file, we use an off-heap cache in LevelDB for 256 MB for now.
+   */
+  private final LevelDBStore cacheDB;
+  private final int cacheSizeMb = 256;
+
+  /**
+   * Asyncblock writer updates the cacheDB and writes the blocks async to
+   * remote containers.
+   */
+  private final AsyncBlockWriter blockWriter;
+
+  /**
+   * Sync block reader tries to read from the cache and if we get a cache
+   * miss we will fetch the block from remote location. It will asynchronously
+   * update the cacheDB.
+   */
+  private final SyncBlockReader blockReader;
+  /**
+   * We create a trace ID to make it easy to debug issues.
+   * A trace ID is in the following format. IPAddress:VolumeName:blockID:second
+   * <p>
+   * This will get written down on the data node if we get any failures, so
+   * with this trace ID we can correlate cBlock failures across machines.
+   */
+  private final String userName;
+  private final String volumeName;
+  private final String ipAddressString;
+  private final String tracePrefix;
+
+  /**
+   * From a block ID we are able to get the pipeline by indexing this array.
+   */
+  private final Pipeline[] containerList;
+  private final int blockSize;
+  private XceiverClientManager clientManager;
+  /**
+   * If this flag is enabled then cache traces all I/O, all reads and writes
+   * are visible in the log with sha of the block written. Makes the system
+   * slower use it only for debugging or creating trace simulations.
+   */
+  private final boolean traceEnabled;
+  private final boolean enableShortCircuitIO;
+  private final long volumeSize;
+  private long currentCacheSize;
+  private File dbPath;
+  private final ContainerCacheFlusher flusher;
+  private CBlockTargetMetrics cblockTargetMetrics;
+
+  /**
+   * Get Db Path.
+   * @return the file instance of the db.
+   */
+  public File getDbPath() {
+    return dbPath;
+  }
+
+  /**
+   * Constructor for CBlockLocalCache invoked via the builder.
+   *
+   * @param conf -  Configuration
+   * @param volumeName - volume Name
+   * @param userName - user name
+   * @param containerPipelines - Pipelines that make up this contianer
+   * @param blockSize - blockSize
+   * @param flusher - flusher to flush data to container
+   * @throws IOException
+   */
+  CBlockLocalCache(
+      Configuration conf, String volumeName,
+      String userName, List<Pipeline> containerPipelines, int blockSize,
+      long volumeSize, ContainerCacheFlusher flusher) throws IOException {
+    this.conf = conf;
+    this.userName = userName;
+    this.volumeName = volumeName;
+    this.blockSize = blockSize;
+    this.flusher = flusher;
+    this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO,
+        DFS_CBLOCK_TRACE_IO_DEFAULT);
+    this.enableShortCircuitIO = conf.getBoolean(
+        DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO,
+        DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO_DEFAULT);
+    dbPath = Paths.get(conf.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+        DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT), userName, volumeName).toFile();
+
+    if (!dbPath.exists() && !dbPath.mkdirs()) {
+      LOG.error("Unable to create the cache paths. Path: {}", dbPath);
+      throw new IllegalArgumentException("Unable to create paths. Path: " +
+          dbPath);
+    }
+    cacheDB = flusher.openDB(dbPath.toString(), cacheSizeMb);
+    this.containerList = containerPipelines.toArray(new
+        Pipeline[containerPipelines.size()]);
+    this.ipAddressString = getHostIP();
+    this.tracePrefix = ipAddressString + ":" + this.volumeName;
+    this.volumeSize = volumeSize;
+
+    blockWriter = new AsyncBlockWriter(conf, this);
+    blockReader = new SyncBlockReader(conf, this);
+    if (this.traceEnabled) {
+      getTracer().info("Task=StartingCache");
+    }
+  }
+
+  private void setClientManager(XceiverClientManager manager) {
+    this.clientManager = manager;
+  }
+
+  private void setCblockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+    this.cblockTargetMetrics = targetMetrics;
+  }
+
+  /**
+   * Returns new builder class that builds a CBlockLocalCache.
+   *
+   * @return Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public void processDirtyMessage(String fileName) {
+    flusher.processDirtyBlocks(dbPath.toString(), fileName);
+  }
+
+  /**
+   * Get usable disk space.
+   *
+   * @param dbPathString - Path to db
+   * @return long bytes remaining.
+   */
+  private static long getRemaningDiskSpace(String dbPathString) {
+    try {
+      URI fileUri = new URI("file:///");
+      Path dbPath = Paths.get(fileUri).resolve(dbPathString);
+      FileStore disk = Files.getFileStore(dbPath);
+      return disk.getUsableSpace();
+    } catch (URISyntaxException | IOException ex) {
+      LOG.error("Unable to get free space on for path :" + dbPathString);
+    }
+    return 0L;
+  }
+
+  /**
+   * Returns the Max current CacheSize.
+   *
+   * @return - Cache Size
+   */
+  public long getCurrentCacheSize() {
+    return currentCacheSize;
+  }
 
-  private CBlockLocalCache() {
+  /**
+   * Sets the Maximum Cache Size.
+   *
+   * @param currentCacheSize - Max current Cache Size.
+   */
+  public void setCurrentCacheSize(long currentCacheSize) {
+    this.currentCacheSize = currentCacheSize;
   }
 
+  /**
+   * True if block tracing is enabled.
+   *
+   * @return - bool
+   */
+  public boolean isTraceEnabled() {
+    return traceEnabled;
+  }
+
+  /**
+   * Checks if Short Circuit I/O is enabled.
+   *
+   * @return - true if it is enabled.
+   */
+  public boolean isShortCircuitIOEnabled() {
+    return enableShortCircuitIO;
+  }
+
+  /**
+   * Returns the default block size of this device.
+   *
+   * @return - int
+   */
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  /**
+   * Gets the client manager.
+   *
+   * @return XceiverClientManager
+   */
+  public XceiverClientManager getClientManager() {
+    return clientManager;
+  }
+
+  /**
+   * check if the key is cached, if yes, returned the cached object.
+   * otherwise, load from data source. Then put it into cache.
+   *
+   * @param blockID
+   * @return the block associated to the blockID
+   */
   @Override
   public LogicalBlock get(long blockID) throws IOException {
-    return null;
+    cblockTargetMetrics.incNumReadOps();
+    return blockReader.readBlock(blockID);
   }
 
+  /**
+   * put the value of the key into cache and remote container.
+   *
+   * @param blockID - BlockID
+   * @param data - byte[]
+   */
   @Override
   public void put(long blockID, byte[] data) throws IOException {
+    cblockTargetMetrics.incNumWriteOps();
+    LogicalBlock block = new DiskBlock(blockID, data, false);
+    blockWriter.writeBlock(block);
   }
 
   @Override
@@ -56,32 +298,179 @@ final public class CBlockLocalCache implements CacheModule {
 
   @Override
   public void start() throws IOException {
-
+    // This is a No-op for us. We start when we bootup.
   }
 
   @Override
   public void stop() throws IOException {
-
   }
 
   @Override
   public void close() throws IOException {
-
+    blockReader.shutdown();
+    blockWriter.shutdown();
+    this.flusher.closeDB(dbPath.toString());
+    if (this.traceEnabled) {
+      getTracer().info("Task=ShutdownCache");
+    }
   }
 
+  /**
+   * Returns true if cache still has blocks pending to write.
+   *
+   * @return false if we have no pending blocks to write.
+   */
   @Override
   public boolean isDirtyCache() {
     return false;
   }
 
-  public static Builder newBuilder() {
-    return new Builder();
+
+  /**
+   * Tries to get the local host IP Address for creating trace IDs.
+   */
+  private String getHostIP() {
+    String tmp;
+    try {
+      tmp = InetAddress.getLocalHost().toString();
+    } catch (UnknownHostException ex) {
+      tmp = UUID.randomUUID().toString();
+      LOG.error("Unable to read the host address. Using a GUID for " +
+          "hostname:{} ", tmp, ex);
+    }
+    return tmp;
+  }
+
+  /**
+   * Returns the local cache DB.
+   *
+   * @return - DB
+   */
+  LevelDBStore getCacheDB() {
+    return this.cacheDB;
+  }
+
+  /**
+   * Returns the current userName.
+   *
+   * @return - UserName
+   */
+  String getUserName() {
+    return this.userName;
+  }
+
+  /**
+   * Returns the volume name.
+   *
+   * @return VolumeName.
+   */
+  String getVolumeName() {
+    return this.volumeName;
+  }
+
+  /**
+   * Returns the target metrics.
+   *
+   * @return CBlock Target Metrics.
+   */
+  CBlockTargetMetrics getTargetMetrics() {
+    return this.cblockTargetMetrics;
+  }
+
+  /**
+   * Returns the pipeline to use given a container.
+   *
+   * @param blockId - blockID
+   * @return - pipeline.
+   */
+  Pipeline getPipeline(long blockId) {
+    int containerIdx = (int) blockId % containerList.length;
+    long cBlockIndex =
+        Longs.fromByteArray(containerList[containerIdx].getData());
+    if (cBlockIndex > 0) {
+      // This catches the case when we get a wrong container in the ordering
+      // of the containers.
+      Preconditions.checkState(containerIdx % cBlockIndex == 0,
+          "The container ID computed should match with the container index " +
+              "returned from cBlock Server.");
+    }
+    return containerList[containerIdx];
+  }
+
+  /**
+   * Returns a traceID based in Block ID.
+   * The format is HostIP:VolumeName:BlockID:timeStamp, in case of error this
+   * will be logged on the container side.
+   *
+   * @param blockID - Block ID
+   * @return trace ID
+   */
+  String getTraceID(long blockID) {
+    // mapping to seconds to make the string smaller.
+    return this.tracePrefix + ":" + blockID + ":"
+        + Time.monotonicNow() / 1000;
+  }
+
+  /**
+   * Returns tracer.
+   *
+   * @return - Logger
+   */
+  Logger getTracer() {
+    return TRACER;
   }
 
   /**
    * Builder class for CBlocklocalCache.
    */
   public static class Builder {
+    private Configuration configuration;
+    private String userName;
+    private String volumeName;
+    private List<Pipeline> pipelines;
+    private XceiverClientManager clientManager;
+    private int blockSize;
+    private long volumeSize;
+    private ContainerCacheFlusher flusher;
+    private CBlockTargetMetrics metrics;
+
+    /**
+     * Ctor.
+     */
+    Builder() {
+    }
+
+    /**
+     * Computes a cache size based on the configuration and available disk
+     * space.
+     *
+     * @param configuration - Config
+     * @param volumeSize - Size of Volume
+     * @param blockSize - Size of the block
+     * @return - cache size in bytes.
+     */
+    private static long computeCacheSize(Configuration configuration,
+        long volumeSize, int blockSize) {
+      long cacheSize = 0;
+      String dbPath = configuration.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY,
+          DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT);
+      if (StringUtils.isBlank(dbPath)) {
+        return cacheSize;
+      }
+      long spaceRemaining = getRemaningDiskSpace(dbPath);
+      double cacheRatio = 1.0;
+
+      if (spaceRemaining < volumeSize) {
+        cacheRatio = (double)spaceRemaining / volumeSize;
+      }
+
+      // if cache is going to be at least 10% of the volume size it is worth
+      // doing, otherwise skip creating the  cache.
+      if (cacheRatio >= 0.10) {
+        cacheSize = Double.doubleToLongBits(volumeSize * cacheRatio);
+      }
+      return cacheSize;
+    }
 
     /**
      * Sets the Config to be used by this cache.
@@ -90,6 +479,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return Builder
      */
     public Builder setConfiguration(Configuration configuration) {
+      this.configuration = configuration;
       return this;
     }
 
@@ -101,6 +491,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return - Builder
      */
     public Builder setUserName(String userName) {
+      this.userName = userName;
       return this;
     }
 
@@ -111,6 +502,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return Builder
      */
     public Builder setVolumeName(String volumeName) {
+      this.volumeName = volumeName;
       return this;
     }
 
@@ -121,6 +513,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return Builder
      */
     public Builder setPipelines(List<Pipeline> pipelines) {
+      this.pipelines = pipelines;
       return this;
     }
 
@@ -131,6 +524,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return - Builder
      */
     public Builder setClientManager(XceiverClientManager clientManager) {
+      this.clientManager = clientManager;
       return this;
     }
 
@@ -141,6 +535,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return - Builder
      */
     public Builder setBlockSize(int blockSize) {
+      this.blockSize = blockSize;
       return this;
     }
 
@@ -151,6 +546,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return - Builder
      */
     public Builder setVolumeSize(long volumeSize) {
+      this.volumeSize = volumeSize;
       return this;
     }
 
@@ -160,6 +556,7 @@ final public class CBlockLocalCache implements CacheModule {
      * @return Builder.
      */
     public Builder setFlusher(ContainerCacheFlusher flusher) {
+      this.flusher = flusher;
       return this;
     }
 
@@ -170,11 +567,52 @@ final public class CBlockLocalCache implements CacheModule {
      * @return - Builder
      */
     public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) {
+      this.metrics = targetMetrics;
       return this;
     }
 
+    /**
+     * Constructs a CBlockLocalCache.
+     *
+     * @return the CBlockLocalCache with the preset properties.
+     * @throws IOException
+     */
     public CBlockLocalCache build() throws IOException {
-      return new CBlockLocalCache();
+      Preconditions.checkNotNull(this.configuration, "A valid configuration " +
+          "is needed");
+      Preconditions.checkState(StringUtils.isNotBlank(userName), "A valid " +
+          "username is needed");
+      Preconditions.checkState(StringUtils.isNotBlank(volumeName), " A valid" +
+          " volume name is needed");
+      Preconditions.checkNotNull(this.pipelines, "Pipelines cannot be null");
+      Preconditions.checkState(this.pipelines.size() > 0, "At least one " +
+          "pipeline location is needed for a volume");
+
+      for (int x = 0; x < pipelines.size(); x++) {
+        Preconditions.checkNotNull(pipelines.get(x).getData(), "cBlock " +
+            "relies on private data on the pipeline, null data found.");
+      }
+
+      Preconditions.checkNotNull(clientManager, "Client Manager canoot be " +
+          "null");
+      Preconditions.checkState(blockSize > 0, " Block size has to be a " +
+          "number greater than 0");
+
+      Preconditions.checkState(volumeSize > 0, "Volume Size cannot be less " +
+          "than 1");
+      Preconditions.checkNotNull(this.flusher, "Flusher cannot be null.");
+
+      CBlockLocalCache cache = new CBlockLocalCache(this.configuration,
+          this.volumeName, this.userName, this.pipelines, blockSize,
+          volumeSize, flusher);
+      cache.setCblockTargetMetrics(this.metrics);
+      cache.setClientManager(this.clientManager);
+
+      // TODO : Support user configurable maximum size.
+      long cacheSize = computeCacheSize(this.configuration, this.volumeSize,
+          this.blockSize);
+      cache.setCurrentCacheSize(cacheSize);
+      return cache;
     }
   }
 }

+ 259 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/SyncBlockReader.java

@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
+
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reads blocks from the container via the local cache.
+ */
+public class SyncBlockReader {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SyncBlockReader.class);
+
+  /**
+   * Update Queue - The reason why we have the queue is that we want to
+   * return the block as soon as we read it from the containers. This queue
+   * is work queue which will take the read block and update the cache.
+   * During testing we found levelDB is slow during writes, hence we wanted
+   * to return as block as soon as possible and update levelDB asynchronously.
+   */
+  private final static int QUEUE_SIZE = 1024;
+  /**
+   * Config.
+   */
+  private final Configuration conf;
+  /**
+   * The parent cache this reader is operating against.
+   */
+  private final CBlockLocalCache parentCache;
+  private final BlockingQueue<Runnable> updateQueue;
+
+  /**
+   * executor is used for running LevelDB updates. In future, we might do
+   * read-aheads and this pool is useful for that too. The reason why we
+   * don't share an executor for reads and writes is because the write task
+   * is couple of magnitude slower than read task. So we don't want the
+   * update DB to queue up behind the writes.
+   */
+  private final ThreadPoolExecutor executor;
+
+  /**
+   * Number of threads that pool starts with.
+   */
+  private final int corePoolSize = 1;
+  /**
+   * Maximum number of threads our pool will ever create.
+   */
+  private final int maxPoolSize = 10;
+  /**
+   * The idle time a thread hangs around waiting for work. if we don't find
+   * new work in 60 seconds the worker thread is killed.
+   */
+  private final long keepAlive = 60L;
+
+  /**
+   * Constructs a SyncBlock reader.
+   *
+   * @param conf - Configuration
+   * @param cache - Cache
+   */
+  public SyncBlockReader(Configuration conf, CBlockLocalCache cache) {
+    this.conf = conf;
+    this.parentCache = cache;
+    updateQueue = new ArrayBlockingQueue<>(QUEUE_SIZE, true);
+    ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("SyncBlockReader Thread #%d").setDaemon(true).build();
+    executor = new HadoopThreadPoolExecutor(
+        corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS,
+        updateQueue, workerThreadFactory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
+  }
+
+  /**
+   * Returns the cache DB.
+   *
+   * @return LevelDB
+   */
+  private LevelDBStore getCacheDB() {
+    return parentCache.getCacheDB();
+  }
+
+  /**
+   * Returns data from the local cache if found, else reads from the remote
+   * container.
+   *
+   * @param blockID - blockID
+   * @return LogicalBlock
+   */
+  LogicalBlock readBlock(long blockID) throws IOException {
+    XceiverClientSpi client = null;
+    byte[] data = getblockFromDB(blockID);
+    if (data != null) {
+      parentCache.getTargetMetrics().incNumReadCacheHits();
+      return new DiskBlock(blockID, data, false);
+    }
+
+    parentCache.getTargetMetrics().incNumReadCacheMiss();
+    try {
+      client = parentCache.getClientManager()
+          .acquireClient(parentCache.getPipeline(blockID));
+      LogicalBlock block = getBlockFromContainer(blockID, client);
+      return block;
+    } finally {
+      if (client != null) {
+        parentCache.getClientManager().releaseClient(client);
+      }
+    }
+  }
+
+  /**
+   * Gets data from the DB if it exists.
+   *
+   * @param blockID - block id
+   * @return data
+   */
+  private byte[] getblockFromDB(long blockID) {
+    try {
+      if(parentCache.isShortCircuitIOEnabled()) {
+        long startTime = Time.monotonicNow();
+        byte[] data = getCacheDB().get(Longs.toByteArray(blockID));
+        long endTime = Time.monotonicNow();
+
+        if (parentCache.isTraceEnabled()) {
+          parentCache.getTracer().info(
+              "Task=ReadTaskDBRead,BlockID={},SHA={},Time={}",
+              blockID, (data != null && data.length > 0)
+                  ? DigestUtils.sha256Hex(data) : null,
+              endTime - startTime);
+        }
+        parentCache.getTargetMetrics().updateDBReadLatency(
+            endTime - startTime);
+        return data;
+      }
+
+
+    } catch (DBException dbe) {
+      LOG.error("Error while reading from cacheDB.", dbe);
+      throw dbe;
+    }
+    return null;
+  }
+
+
+  /**
+   * Returns a block from a Remote Container. if the key is not found on a
+   * remote container we just return a block initialzied with zeros.
+   *
+   * @param blockID - blockID
+   * @param client - client
+   * @return LogicalBlock
+   * @throws IOException
+   */
+  private LogicalBlock getBlockFromContainer(long blockID,
+      XceiverClientSpi client) throws IOException {
+    String containerName = parentCache.getPipeline(blockID).getContainerName();
+    try {
+      long startTime = Time.monotonicNow();
+      ContainerProtos.GetSmallFileResponseProto response =
+          ContainerProtocolCalls.readSmallFile(client, containerName,
+              Long.toString(blockID), parentCache.getTraceID(blockID));
+      long endTime = Time.monotonicNow();
+      if (parentCache.isTraceEnabled()) {
+        parentCache.getTracer().info(
+            "Task=ReadTaskContainerRead,BlockID={},SHA={},Time={}",
+            blockID, response.getData().getData().toByteArray().length > 0 ?
+                DigestUtils.sha256Hex(response.getData()
+                    .getData().toByteArray()) : null, endTime - startTime);
+      }
+
+      parentCache.getTargetMetrics().updateContainerReadLatency(
+          endTime - startTime);
+      DiskBlock block = new DiskBlock(blockID,
+          response.getData().getData().toByteArray(), false);
+
+      if(parentCache.isShortCircuitIOEnabled()) {
+        queueUpdateTask(block);
+      }
+
+      return block;
+    } catch (IOException ex) {
+      if (ex instanceof StorageContainerException) {
+        parentCache.getTargetMetrics().incNumReadLostBlocks();
+        StorageContainerException sce = (StorageContainerException) ex;
+        if (sce.getResult() == ContainerProtos.Result.NO_SUCH_KEY ||
+            sce.getResult() == ContainerProtos.Result.IO_EXCEPTION) {
+          return new DiskBlock(blockID, new byte[parentCache.getBlockSize()],
+              false);
+        }
+      }
+      throw ex;
+    }
+  }
+
+  /**
+   * Updates the cache with the block that we just read.
+   *
+   * @param block
+   */
+  private void queueUpdateTask(final DiskBlock block) {
+    Runnable updateTask = () -> {
+      if(block.getData().array().length > 0) {
+        getCacheDB().put(Longs.toByteArray(block.getBlockID()),
+            block.getData().array());
+        block.setPersisted(true);
+      } else {
+        LOG.error("Refusing to update the a null block in the local cache.");
+      }
+    };
+    if (this.executor.isShutdown() || this.executor.isTerminated()) {
+      LOG.error("Thread executor is not running.");
+    } else {
+      this.executor.submit(updateTask);
+    }
+  }
+
+  /**
+   * This is a read operation, we don't care if we updated the cache with the
+   * last block e read.
+   */
+  void shutdown() {
+    this.executor.shutdownNow();
+  }
+}

+ 431 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java

@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.cblock;
+
+import com.google.common.primitives.Longs;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl;
+import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
+import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
+import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
+import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.lang.Math.abs;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
+import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
+
+/**
+ * Tests for Tests for local cache.
+ */
+public class TestLocalBlockCache {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestLocalBlockCache.class);
+  private final static long GB = 1024 * 1024 * 1024;
+  private final static int KB = 1024;
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration config;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static XceiverClientManager xceiverClientManager;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    URL p = config.getClass().getResource("");
+    String path = p.getPath().concat(
+        TestOzoneContainer.class.getSimpleName());
+    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
+    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
+    cluster = new MiniOzoneCluster.Builder(config)
+        .numDataNodes(1).setHandlerType("distributed").build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+    xceiverClientManager = new XceiverClientManager(config);
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.cleanup(null, storageContainerLocationClient, cluster);
+  }
+
+  /**
+   * getContainerPipelines creates a set of containers and returns the
+   * Pipelines that define those containers.
+   *
+   * @param count - Number of containers to create.
+   * @return - List of Pipelines.
+   * @throws IOException throws Exception
+   */
+  private List<Pipeline> getContainerPipeline(int count) throws IOException {
+    List<Pipeline> containerPipelines = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
+      String containerName = "container" + RandomStringUtils.randomNumeric(10);
+      Pipeline pipeline =
+          storageContainerLocationClient.allocateContainer(containerName);
+      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
+      ContainerProtocolCalls.createContainer(client, traceID);
+      // This step is needed since we set private data on pipelines, when we
+      // read the list from CBlockServer. So we mimic that action here.
+      pipeline.setData(Longs.toByteArray(x));
+      containerPipelines.add(pipeline);
+    }
+    return containerPipelines;
+  }
+
+  /**
+   * This test creates a cache and performs a simple write / read.
+   * Due to the cache - we have Read-after-write consistency for cBlocks.
+   *
+   * @throws IOException throws Exception
+   */
+  @Test
+  public void testCacheWriteRead() throws IOException,
+      InterruptedException, TimeoutException {
+    final long blockID = 0;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    String dataHash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(this.config)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(1, metrics.getNumWriteOps());
+    // Please note that this read is from the local cache.
+    LogicalBlock block = cache.get(blockID);
+    Assert.assertEquals(1, metrics.getNumReadOps());
+    Assert.assertEquals(1, metrics.getNumReadCacheHits());
+    Assert.assertEquals(0, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+
+    cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(2, metrics.getNumWriteOps());
+    // Please note that this read is from the local cache.
+    block = cache.get(blockID + 1);
+    Assert.assertEquals(2, metrics.getNumReadOps());
+    Assert.assertEquals(2, metrics.getNumReadCacheHits());
+    Assert.assertEquals(0, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    String readHash = DigestUtils.sha256Hex(block.getData().array());
+    Assert.assertEquals("File content does not match.", dataHash, readHash);
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    cache.close();
+
+  }
+
+  @Test
+  public void testCacheWriteToRemoteContainer() throws IOException,
+      InterruptedException, TimeoutException {
+    final long blockID = 0;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(this.config)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    cache.close();
+  }
+
+  @Test
+  public void testCacheWriteToRemote50KBlocks() throws IOException,
+      InterruptedException, TimeoutException {
+    final long totalBlocks = 50 * 1000;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(this.config)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * 1024)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    long startTime = Time.monotonicNow();
+    for (long blockid = 0; blockid < totalBlocks; blockid++) {
+      cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(totalBlocks, metrics.getNumWriteOps());
+    LOG.info("Wrote 50K blocks, waiting for replication to finish.");
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    long endTime = Time.monotonicNow();
+    LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
+        TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
+    // TODO: Read this data back.
+    cache.close();
+  }
+
+  @Test
+  public void testCacheInvalidBlock() throws IOException {
+    final int blockID = 1024;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(this.config)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    // Read a non-existent block ID.
+    LogicalBlock block = cache.get(blockID);
+    Assert.assertNotNull(block);
+    Assert.assertEquals(4 * 1024, block.getData().array().length);
+    Assert.assertEquals(1, metrics.getNumReadOps());
+    Assert.assertEquals(1, metrics.getNumReadLostBlocks());
+    Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+  }
+
+  @Test
+  public void testReadWriteCorrectness() throws IOException,
+      InterruptedException, TimeoutException {
+    Random r = new Random();
+    final int maxBlock = 12500000;
+    final int blockCount = 10 * 1000;
+    Map<Long, String> blockShaMap = new HashMap<>();
+    List<Pipeline> pipelines = getContainerPipeline(10);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, metrics);
+    final CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(this.config)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(pipelines)
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    for (int x = 0; x < blockCount; x++) {
+      String data = RandomStringUtils.random(4 * 1024);
+      String dataHash = DigestUtils.sha256Hex(data);
+      long blockId = abs(r.nextInt(maxBlock));
+      blockShaMap.put(blockId, dataHash);
+      cache.put(blockId, data.getBytes(StandardCharsets.UTF_8));
+    }
+    Assert.assertEquals(blockCount, metrics.getNumWriteOps());
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    LOG.info("Finished with putting blocks ..starting reading blocks back. " +
+        "unique blocks : {}", blockShaMap.size());
+    // Test reading from local cache.
+    for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) {
+      LogicalBlock block = cache.get(entry.getKey());
+      String blockSha = DigestUtils.sha256Hex(block.getData().array());
+      Assert.assertEquals("Block data is not equal", entry.getValue(),
+          blockSha);
+    }
+    Assert.assertEquals(blockShaMap.size(), metrics.getNumReadOps());
+    Assert.assertEquals(blockShaMap.size(), metrics.getNumReadCacheHits());
+    Assert.assertEquals(0, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+
+    LOG.info("Finished with reading blocks, SUCCESS.");
+    // Close and discard local cache.
+    cache.close();
+    LOG.info("Closing the and destroying local cache");
+    CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher newflusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, newMetrics);
+    Assert.assertEquals(0, newMetrics.getNumReadCacheHits());
+    CBlockLocalCache newCache = null;
+    try {
+      newCache = CBlockLocalCache.newBuilder()
+          .setConfiguration(this.config)
+          .setVolumeName(volumeName)
+          .setUserName(userName)
+          .setPipelines(pipelines)
+          .setClientManager(xceiverClientManager)
+          .setBlockSize(4 * KB)
+          .setVolumeSize(50 * GB)
+          .setFlusher(newflusher)
+          .setCBlockTargetMetrics(newMetrics)
+          .build();
+
+      for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) {
+        LogicalBlock block = newCache.get(entry.getKey());
+        String blockSha = DigestUtils.sha256Hex(block.getData().array());
+        Assert.assertEquals("Block data is not equal", entry.getValue(),
+            blockSha);
+      }
+
+      Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadOps());
+      Assert.assertEquals(blockShaMap.size(), newMetrics.getNumReadCacheHits());
+      Assert.assertEquals(0, newMetrics.getNumReadCacheMiss());
+      Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
+
+      LOG.info("Finished with reading blocks from remote cache, SUCCESS.");
+    } finally {
+      if (newCache != null) {
+        newCache.close();
+      }
+    }
+  }
+
+  @Test
+  public void testStorageImplReadWrite() throws IOException,
+      InterruptedException, TimeoutException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    long volumeSize = 50L * (1024L * 1024L * 1024L);
+    int blockSize = 4096;
+    byte[] data =
+        RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
+            .getBytes(StandardCharsets.UTF_8);
+    String hash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(this.config,
+        xceiverClientManager, metrics);
+    CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
+        .setUserName(userName)
+        .setVolumeName(volumeName)
+        .setVolumeSize(volumeSize)
+        .setBlockSize(blockSize)
+        .setContainerList(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setConf(this.config)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    ozoneStore.write(data, 0);
+
+    byte[] newData = new byte[10 * 1024 * 1024];
+    ozoneStore.read(newData, 0);
+    String newHash = DigestUtils.sha256Hex(newData);
+    Assert.assertEquals("hashes don't match.", hash, newHash);
+    GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
+        100, 20 * 1000);
+    ozoneStore.close();
+  }
+
+  //@Test
+  // Disabling this test for time being since the bug in JSCSI
+  // forces us always to have a local cache.
+  public void testStorageImplNoLocalCache() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration oConfig = new OzoneConfiguration();
+    oConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    oConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    long volumeSize = 50L * (1024L * 1024L * 1024L);
+    int blockSize = 4096;
+    byte[] data =
+        RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
+            .getBytes(StandardCharsets.UTF_8);
+    String hash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig,
+        xceiverClientManager, metrics);
+    CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
+        .setUserName(userName)
+        .setVolumeName(volumeName)
+        .setVolumeSize(volumeSize)
+        .setBlockSize(blockSize)
+        .setContainerList(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setConf(oConfig)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    ozoneStore.write(data, 0);
+
+    byte[] newData = new byte[10 * 1024 * 1024];
+    ozoneStore.read(newData, 0);
+    String newHash = DigestUtils.sha256Hex(newData);
+    Assert.assertEquals("hashes don't match.", hash, newHash);
+    GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
+        100, 20 * 1000);
+    ozoneStore.close();
+  }
+}

+ 0 - 157
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java

@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.cblock;
-
-import com.google.common.primitives.Longs;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl;
-import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
-import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY;
-import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO;
-
-/**
- * This class tests the cblock storage layer.
- */
-public class TestStorageImpl {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestStorageImpl.class);
-  private final static long GB = 1024 * 1024 * 1024;
-  private final static int KB = 1024;
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration config;
-  private static StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private static XceiverClientManager xceiverClientManager;
-
-  @BeforeClass
-  public static void init() throws IOException {
-    config = new OzoneConfiguration();
-    URL p = config.getClass().getResource("");
-    String path = p.getPath().concat(
-        TestOzoneContainer.class.getSimpleName());
-    config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
-    config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
-    cluster = new MiniOzoneCluster.Builder(config)
-        .numDataNodes(1).setHandlerType("distributed").build();
-    storageContainerLocationClient = cluster
-        .createStorageContainerLocationClient();
-    xceiverClientManager = new XceiverClientManager(config);
-  }
-
-  @AfterClass
-  public static void shutdown() throws InterruptedException {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-    IOUtils.cleanup(null, storageContainerLocationClient, cluster);
-  }
-
-  /**
-   * getContainerPipelines creates a set of containers and returns the
-   * Pipelines that define those containers.
-   *
-   * @param count - Number of containers to create.
-   * @return - List of Pipelines.
-   * @throws IOException
-   */
-  private List<Pipeline> getContainerPipeline(int count) throws IOException {
-    List<Pipeline> containerPipelines = new LinkedList<>();
-    for (int x = 0; x < count; x++) {
-      String traceID = "trace" + RandomStringUtils.randomNumeric(4);
-      String containerName = "container" + RandomStringUtils.randomNumeric(10);
-      Pipeline pipeline =
-          storageContainerLocationClient.allocateContainer(containerName);
-      XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
-      ContainerProtocolCalls.createContainer(client, traceID);
-      // This step is needed since we set private data on pipelines, when we
-      // read the list from CBlockServer. So we mimic that action here.
-      pipeline.setData(Longs.toByteArray(x));
-      containerPipelines.add(pipeline);
-    }
-    return containerPipelines;
-  }
-
-  @Test
-  public void testStorageImplBasicReadWrite() throws Exception {
-    OzoneConfiguration oConfig = new OzoneConfiguration();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    long volumeSize = 50L * (1024L * 1024L * 1024L);
-    int blockSize = 4096;
-    byte[] data =
-        RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024))
-            .getBytes(StandardCharsets.UTF_8);
-    String hash = DigestUtils.sha256Hex(data);
-    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
-    ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig,
-        xceiverClientManager, metrics);
-    CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder()
-        .setUserName(userName)
-        .setVolumeName(volumeName)
-        .setVolumeSize(volumeSize)
-        .setBlockSize(blockSize)
-        .setContainerList(getContainerPipeline(10))
-        .setClientManager(xceiverClientManager)
-        .setConf(oConfig)
-        .setFlusher(flusher)
-        .setCBlockTargetMetrics(metrics)
-        .build();
-    ozoneStore.write(data, 0);
-
-    // Currently, local cache is a placeholder and does not actually handle
-    // read and write. So the below write is guaranteed to fail. After
-    // CBlockLocalCache is properly implemented, we should uncomment the
-    // following lines
-    // TODO uncomment the following.
-
-    //byte[] newData = new byte[10 * 1024 * 1024];
-    //ozoneStore.read(newData, 0);
-    //String newHash = DigestUtils.sha256Hex(newData);
-    //Assert.assertEquals("hashes don't match.", hash, newHash);
-    GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(),
-        100, 20 *
-            1000);
-
-    ozoneStore.close();
-  }
-}