Bläddra i källkod

HDFS-11618. Block Storage: Add Support for Direct I/O. Contributed by Mukul Kumar Singh.

Chen Liang 8 år sedan
förälder
incheckning
b7463924af

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java

@@ -39,6 +39,8 @@ public class CBlockTargetMetrics {
   @Metric private MutableCounterLong numReadCacheHits;
   @Metric private MutableCounterLong numReadCacheMiss;
   @Metric private MutableCounterLong numReadLostBlocks;
+  @Metric private MutableCounterLong numDirectBlockWrites;
+  @Metric private MutableCounterLong numFailedDirectBlockWrites;
 
   @Metric private MutableRate dbReadLatency;
   @Metric private MutableRate containerReadLatency;
@@ -46,6 +48,8 @@ public class CBlockTargetMetrics {
   @Metric private MutableRate dbWriteLatency;
   @Metric private MutableRate containerWriteLatency;
 
+  @Metric private MutableRate directBlockWriteLatency;
+
   public CBlockTargetMetrics() {
   }
 
@@ -76,6 +80,14 @@ public class CBlockTargetMetrics {
     numReadLostBlocks.incr();
   }
 
+  public void incNumDirectBlockWrites() {
+    numDirectBlockWrites.incr();
+  }
+
+  public void incNumFailedDirectBlockWrites() {
+    numFailedDirectBlockWrites.incr();
+  }
+
   public void updateDBReadLatency(long latency) {
     dbReadLatency.add(latency);
   }
@@ -92,6 +104,10 @@ public class CBlockTargetMetrics {
     containerWriteLatency.add(latency);
   }
 
+  public void updateDirectBlockWriteLatency(long latency) {
+    directBlockWriteLatency.add(latency);
+  }
+
   @VisibleForTesting
   public long getNumReadOps() {
     return numReadOps.value();
@@ -116,4 +132,14 @@ public class CBlockTargetMetrics {
   public long getNumReadLostBlocks() {
     return numReadLostBlocks.value();
   }
+
+  @VisibleForTesting
+  public long getNumDirectBlockWrites() {
+    return numDirectBlockWrites.value();
+  }
+
+  @VisibleForTesting
+  public long getNumFailedDirectBlockWrites() {
+    return numFailedDirectBlockWrites.value();
+  }
 }

+ 30 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java

@@ -23,6 +23,9 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.LevelDBStore;
 import org.slf4j.Logger;
@@ -167,10 +170,33 @@ public class AsyncBlockWriter {
       }
       block.clearData();
     } else {
-      // TODO : Support Direct I/O
-      LOG.error("Non-Cache I/O is not supported at this point of time.");
-      throw new IllegalStateException("Cache is required and cannot be " +
-          "disabled now.");
+      Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
+      String containerName = pipeline.getContainerName();
+      try {
+        long startTime = Time.monotonicNow();
+        XceiverClientSpi client = parentCache.getClientManager()
+            .acquireClient(parentCache.getPipeline(block.getBlockID()));
+        // BUG: fix the trace ID.
+        ContainerProtocolCalls.writeSmallFile(client, containerName,
+            Long.toString(block.getBlockID()), block.getData().array(), "");
+        long endTime = Time.monotonicNow();
+        if (parentCache.isTraceEnabled()) {
+          String datahash = DigestUtils.sha256Hex(block.getData().array());
+          parentCache.getTracer().info(
+              "Task=DirectWriterPut,BlockID={},Time={},SHA={}",
+              block.getBlockID(), endTime - startTime, datahash);
+        }
+        parentCache.getTargetMetrics().
+            updateDirectBlockWriteLatency(endTime - startTime);
+        parentCache.getTargetMetrics().incNumDirectBlockWrites();
+      } catch (Exception ex) {
+        parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
+        LOG.error("Direct I/O writing of block:{} to container {} failed",
+            block.getBlockID(), containerName, ex);
+        throw ex;
+      } finally {
+        block.clearData();
+      }
     }
     if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
       long startTime = Time.monotonicNow();

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java

@@ -428,4 +428,63 @@ public class TestLocalBlockCache {
         100, 20 * 1000);
     ozoneStore.close();
   }
+
+  /**
+   * This test creates a cache and performs a simple write / read.
+   * The operations are done by bypassing the cache.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testDirectIO() throws IOException,
+      InterruptedException, TimeoutException {
+    OzoneConfiguration cConfig = new OzoneConfiguration();
+    cConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, false);
+    cConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
+    final long blockID = 0;
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
+    String userName = "user" + RandomStringUtils.randomNumeric(4);
+    String data = RandomStringUtils.random(4 * KB);
+    String dataHash = DigestUtils.sha256Hex(data);
+    CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
+    ContainerCacheFlusher flusher = new ContainerCacheFlusher(cConfig,
+        xceiverClientManager, metrics);
+    CBlockLocalCache cache = CBlockLocalCache.newBuilder()
+        .setConfiguration(cConfig)
+        .setVolumeName(volumeName)
+        .setUserName(userName)
+        .setPipelines(getContainerPipeline(10))
+        .setClientManager(xceiverClientManager)
+        .setBlockSize(4 * KB)
+        .setVolumeSize(50 * GB)
+        .setFlusher(flusher)
+        .setCBlockTargetMetrics(metrics)
+        .build();
+    Assert.assertFalse(cache.isShortCircuitIOEnabled());
+    cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(1, metrics.getNumWriteOps());
+    // Please note that this read is directly from remote container
+    LogicalBlock block = cache.get(blockID);
+    Assert.assertEquals(1, metrics.getNumReadOps());
+    Assert.assertEquals(0, metrics.getNumReadCacheHits());
+    Assert.assertEquals(1, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
+
+    cache.put(blockID + 1, data.getBytes(StandardCharsets.UTF_8));
+    Assert.assertEquals(2, metrics.getNumDirectBlockWrites());
+    Assert.assertEquals(2, metrics.getNumWriteOps());
+    Assert.assertEquals(0, metrics.getNumFailedDirectBlockWrites());
+    // Please note that this read is directly from remote container
+    block = cache.get(blockID + 1);
+    Assert.assertEquals(2, metrics.getNumReadOps());
+    Assert.assertEquals(0, metrics.getNumReadCacheHits());
+    Assert.assertEquals(2, metrics.getNumReadCacheMiss());
+    Assert.assertEquals(0, metrics.getNumReadLostBlocks());
+    String readHash = DigestUtils.sha256Hex(block.getData().array());
+    Assert.assertEquals("File content does not match.", dataHash, readHash);
+    GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
+    cache.close();
+  }
 }