Browse Source

HDFS-13926. ThreadLocal aggregations for FileSystem.Statistics are incorrect with striped reads.
Contributed by Xiao Chen, Hrishikesh Gadre.

Signed-off-by: Xiao Chen <xiao@apache.org>

Hrishikesh Gadre 6 years ago
parent
commit
08bb6c49a5

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -90,6 +90,8 @@ import com.google.common.annotations.VisibleForTesting;
 
 import javax.annotation.Nonnull;
 
+import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
+
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles
  * negotiation of the namenode and various datanodes as necessary.
@@ -769,6 +771,12 @@ public class DFSInputStream extends FSInputStream
             // got a EOS from reader though we expect more data on it.
             throw new IOException("Unexpected EOS from the reader");
           }
+          updateReadStatistics(readStatistics, result, blockReader);
+          dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+              result);
+          if (readStatistics.getBlockType() == BlockType.STRIPED) {
+            dfsClient.updateFileSystemECReadStats(result);
+          }
           return result;
         } catch (ChecksumException ce) {
           throw ce;

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

@@ -54,6 +54,8 @@ import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
+
 /**
  * DFSStripedInputStream reads from striped block groups.
  */
@@ -328,6 +330,26 @@ public class DFSStripedInputStream extends DFSInputStream {
     curStripeRange = stripeRange;
   }
 
+  /**
+   * Update read statistics. Note that this has to be done on the thread that
+   * initiates the read, rather than inside each async thread, for
+   * {@link org.apache.hadoop.fs.FileSystem.Statistics} to work correctly with
+   * its ThreadLocal.
+   *
+   * @param stats striped read stats
+   */
+  void updateReadStats(final StripedBlockUtil.BlockReadStats stats) {
+    if (stats == null) {
+      return;
+    }
+    updateReadStatistics(readStatistics, stats.getBytesRead(),
+        stats.isShortCircuit(), stats.getNetworkDistance());
+    dfsClient.updateFileSystemReadStats(stats.getNetworkDistance(),
+        stats.getBytesRead());
+    assert readStatistics.getBlockType() == BlockType.STRIPED;
+    dfsClient.updateFileSystemECReadStats(stats.getBytesRead());
+  }
+
   /**
    * Seek to a new arbitrary location.
    */

+ 0 - 15
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReaderStrategy.java

@@ -17,11 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.hdfs.protocol.BlockType;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
 
 /**
  * Wraps different possible read implementations so that callers can be
@@ -120,12 +117,6 @@ class ByteArrayStrategy implements ReaderStrategy {
                            int length) throws IOException {
     int nRead = blockReader.read(readBuf, offset, length);
     if (nRead > 0) {
-      updateReadStatistics(readStatistics, nRead, blockReader);
-      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-          nRead);
-      if (readStatistics.getBlockType() == BlockType.STRIPED) {
-        dfsClient.updateFileSystemECReadStats(nRead);
-      }
       offset += nRead;
     }
     return nRead;
@@ -190,12 +181,6 @@ class ByteBufferStrategy implements ReaderStrategy {
     // Only when data are read, update the position
     if (nRead > 0) {
       readBuf.position(readBuf.position() + nRead);
-      updateReadStatistics(readStatistics, nRead, blockReader);
-      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
-          nRead);
-      if (readStatistics.getBlockType() == BlockType.STRIPED) {
-        dfsClient.updateFileSystemECReadStats(nRead);
-      }
     }
 
     return nRead;

+ 15 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
@@ -105,9 +106,10 @@ abstract class StripeReader {
     }
   }
 
-  protected final Map<Future<Void>, Integer> futures = new HashMap<>();
+  private final Map<Future<BlockReadStats>, Integer> futures =
+      new HashMap<>();
   protected final AlignedStripe alignedStripe;
-  protected final CompletionService<Void> service;
+  private final CompletionService<BlockReadStats> service;
   protected final LocatedBlock[] targetBlocks;
   protected final CorruptedBlocks corruptedBlocks;
   protected final BlockReaderInfo[] readerInfos;
@@ -257,7 +259,7 @@ abstract class StripeReader {
     }
   }
 
-  private Callable<Void> readCells(final BlockReader reader,
+  private Callable<BlockReadStats> readCells(final BlockReader reader,
       final DatanodeInfo datanode, final long currentReaderOffset,
       final long targetReaderOffset, final ByteBufferStrategy[] strategies,
       final ExtendedBlock currentBlock) {
@@ -275,10 +277,13 @@ abstract class StripeReader {
             skipped == targetReaderOffset - currentReaderOffset);
       }
 
+      int ret = 0;
       for (ByteBufferStrategy strategy : strategies) {
-        readToBuffer(reader, datanode, strategy, currentBlock);
+        int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock);
+        ret += bytesReead;
       }
-      return null;
+      return new BlockReadStats(ret, reader.isShortCircuit(),
+          reader.getNetworkDistance());
     };
   }
 
@@ -303,13 +308,14 @@ abstract class StripeReader {
     }
 
     chunk.state = StripingChunk.PENDING;
-    Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
+    Callable<BlockReadStats> readCallable =
+        readCells(readerInfos[chunkIndex].reader,
         readerInfos[chunkIndex].datanode,
         readerInfos[chunkIndex].blockReaderOffset,
         alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
         block.getBlock());
 
-    Future<Void> request = service.submit(readCallable);
+    Future<BlockReadStats> request = service.submit(readCallable);
     futures.put(request, chunkIndex);
     return true;
   }
@@ -342,6 +348,7 @@ abstract class StripeReader {
       try {
         StripingChunkReadResult r = StripedBlockUtil
             .getNextCompletedStripedRead(service, futures, 0);
+        dfsStripedInputStream.updateReadStats(r.getReadStats());
         if (DFSClient.LOG.isDebugEnabled()) {
           DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
               + alignedStripe);
@@ -460,7 +467,7 @@ abstract class StripeReader {
   }
 
   void clearFutures() {
-    for (Future<Void> future : futures.keySet()) {
+    for (Future future : futures.keySet()) {
       future.cancel(false);
     }
     futures.clear();

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java

@@ -48,13 +48,19 @@ public class IOUtilsClient {
 
   public static void updateReadStatistics(ReadStatistics readStatistics,
                                       int nRead, BlockReader blockReader) {
+    updateReadStatistics(readStatistics, nRead, blockReader.isShortCircuit(),
+        blockReader.getNetworkDistance());
+  }
+
+  public static void updateReadStatistics(ReadStatistics readStatistics,
+      int nRead, boolean isShortCircuit, int networkDistance) {
     if (nRead <= 0) {
       return;
     }
 
-    if (blockReader.isShortCircuit()) {
+    if (isShortCircuit) {
       readStatistics.addShortCircuitBytes(nRead);
-    } else if (blockReader.getNetworkDistance() == 0) {
+    } else if (networkDistance == 0) {
       readStatistics.addLocalBytes(nRead);
     } else {
       readStatistics.addRemoteBytes(nRead);

+ 60 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java

@@ -76,6 +76,48 @@ public class StripedBlockUtil {
   public static final Logger LOG =
       LoggerFactory.getLogger(StripedBlockUtil.class);
 
+  /**
+   * Struct holding the read statistics. This is used when reads are done
+   * asynchronously, to allow the async threads return the read stats and let
+   * the main reading thread to update the stats. This is important for the
+   * ThreadLocal stats for the main reading thread to be correct.
+   */
+  public static class BlockReadStats {
+    private final int bytesRead;
+    private final boolean isShortCircuit;
+    private final int networkDistance;
+
+    public BlockReadStats(int numBytesRead, boolean shortCircuit,
+        int distance) {
+      bytesRead = numBytesRead;
+      isShortCircuit = shortCircuit;
+      networkDistance = distance;
+    }
+
+    public int getBytesRead() {
+      return bytesRead;
+    }
+
+    public boolean isShortCircuit() {
+      return isShortCircuit;
+    }
+
+    public int getNetworkDistance() {
+      return networkDistance;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("bytesRead=").append(bytesRead);
+      sb.append(',');
+      sb.append("isShortCircuit=").append(isShortCircuit);
+      sb.append(',');
+      sb.append("networkDistance=").append(networkDistance);
+      return sb.toString();
+    }
+  }
+
   /**
    * This method parses a striped block group into individual blocks.
    *
@@ -245,10 +287,11 @@ public class StripedBlockUtil {
    * @throws InterruptedException
    */
   public static StripingChunkReadResult getNextCompletedStripedRead(
-      CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
+      CompletionService<BlockReadStats> readService,
+      Map<Future<BlockReadStats>, Integer> futures,
       final long timeoutMillis) throws InterruptedException {
     Preconditions.checkArgument(!futures.isEmpty());
-    Future<Void> future = null;
+    Future<BlockReadStats> future = null;
     try {
       if (timeoutMillis > 0) {
         future = readService.poll(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -256,9 +299,9 @@ public class StripedBlockUtil {
         future = readService.take();
       }
       if (future != null) {
-        future.get();
+        final BlockReadStats stats = future.get();
         return new StripingChunkReadResult(futures.remove(future),
-            StripingChunkReadResult.SUCCESSFUL);
+            StripingChunkReadResult.SUCCESSFUL, stats);
       } else {
         return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
       }
@@ -881,24 +924,36 @@ public class StripedBlockUtil {
 
     public final int index;
     public final int state;
+    private final BlockReadStats readStats;
 
     public StripingChunkReadResult(int state) {
       Preconditions.checkArgument(state == TIMEOUT,
           "Only timeout result should return negative index.");
       this.index = -1;
       this.state = state;
+      this.readStats = null;
     }
 
     public StripingChunkReadResult(int index, int state) {
+      this(index, state, null);
+    }
+
+    public StripingChunkReadResult(int index, int state, BlockReadStats stats) {
       Preconditions.checkArgument(state != TIMEOUT,
           "Timeout result should return negative index.");
       this.index = index;
       this.state = state;
+      this.readStats = stats;
+    }
+
+    public BlockReadStats getReadStats() {
+      return readStats;
     }
 
     @Override
     public String toString() {
-      return "(index=" + index + ", state =" + state + ")";
+      return "(index=" + index + ", state =" + state + ", readStats ="
+          + readStats + ")";
     }
   }
 

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 
@@ -161,7 +162,7 @@ public final class ErasureCodingWorker {
     return conf;
   }
 
-  CompletionService<Void> createReadService() {
+  CompletionService<BlockReadStats> createReadService() {
     return new ExecutorCompletionService<>(stripedReadPool);
   }
 

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -161,16 +162,15 @@ class StripedBlockReader {
     }
   }
 
-  Callable<Void> readFromBlock(final int length,
+  Callable<BlockReadStats> readFromBlock(final int length,
                                final CorruptedBlocks corruptedBlocks) {
-    return new Callable<Void>() {
+    return new Callable<BlockReadStats>() {
 
       @Override
-      public Void call() throws Exception {
+      public BlockReadStats call() throws Exception {
         try {
           getReadBuffer().limit(length);
-          actualReadFromBlock();
-          return null;
+          return actualReadFromBlock();
         } catch (ChecksumException e) {
           LOG.warn("Found Checksum error for {} from {} at {}", block,
               source, e.getPos());
@@ -187,7 +187,7 @@ class StripedBlockReader {
   /**
    * Perform actual reading of bytes from block.
    */
-  private void actualReadFromBlock() throws IOException {
+  private BlockReadStats actualReadFromBlock() throws IOException {
     int len = buffer.remaining();
     int n = 0;
     while (n < len) {
@@ -198,6 +198,8 @@ class StripedBlockReader {
       n += nread;
       stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
     }
+    return new BlockReadStats(n, blockReader.isShortCircuit(),
+        blockReader.getNetworkDistance());
   }
 
   // close block reader

+ 9 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 import org.apache.hadoop.util.DataChecksum;
 import org.slf4j.Logger;
@@ -80,8 +81,8 @@ class StripedReader {
 
   private final List<StripedBlockReader> readers;
 
-  private final Map<Future<Void>, Integer> futures = new HashMap<>();
-  private final CompletionService<Void> readService;
+  private final Map<Future<BlockReadStats>, Integer> futures = new HashMap<>();
+  private final CompletionService<BlockReadStats> readService;
 
   StripedReader(StripedReconstructor reconstructor, DataNode datanode,
       Configuration conf, StripedReconstructionInfo stripedReconInfo) {
@@ -289,9 +290,9 @@ class StripedReader {
       int toRead = getReadLength(liveIndices[successList[i]],
           reconstructLength);
       if (toRead > 0) {
-        Callable<Void> readCallable =
+        Callable<BlockReadStats> readCallable =
             reader.readFromBlock(toRead, corruptedBlocks);
-        Future<Void> f = readService.submit(readCallable);
+        Future<BlockReadStats> f = readService.submit(readCallable);
         futures.put(f, successList[i]);
       } else {
         // If the read length is 0, we don't need to do real read
@@ -411,9 +412,9 @@ class StripedReader {
 
     // step3: schedule if find a correct source DN and need to do real read.
     if (reader != null) {
-      Callable<Void> readCallable =
+      Callable<BlockReadStats> readCallable =
           reader.readFromBlock(toRead, corruptedBlocks);
-      Future<Void> f = readService.submit(readCallable);
+      Future<BlockReadStats> f = readService.submit(readCallable);
       futures.put(f, m);
       used.set(m);
     }
@@ -422,8 +423,8 @@ class StripedReader {
   }
 
   // Cancel all reads.
-  private static void cancelReads(Collection<Future<Void>> futures) {
-    for (Future<Void> future : futures) {
+  private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
+    for (Future<BlockReadStats> future : futures) {
       future.cancel(true);
     }
   }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
@@ -222,7 +223,7 @@ abstract class StripedReconstructor {
     return cachingStrategy;
   }
 
-  CompletionService<Void> createReadService() {
+  CompletionService<BlockReadStats> createReadService() {
     return erasureCodingWorker.createReadService();
   }
 

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystemWithECFile.java

@@ -21,16 +21,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -61,6 +66,9 @@ public class TestDistributedFileSystemWithECFile {
     return StripedFileTestUtil.getDefaultECPolicy();
   }
 
+  @Rule
+  public final Timeout globalTimeout = new Timeout(60000 * 3);
+
   @Before
   public void setup() throws IOException {
     ecPolicy = getEcPolicy();
@@ -249,4 +257,40 @@ public class TestDistributedFileSystemWithECFile {
     assertEquals(rs63, fs.getErasureCodingPolicy(ecFile));
     assertEquals(rs32, fs.getErasureCodingPolicy(ecFile2));
   }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testStatistics() throws Exception {
+    final String fileName = "/ec/file";
+    final int size = 3200;
+    createFile(fileName, size);
+    InputStream in = null;
+    try {
+      in = fs.open(new Path(fileName));
+      IOUtils.copyBytes(in, System.out, 4096, false);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+
+    // verify stats are correct
+    Long totalBytesRead = 0L;
+    Long ecBytesRead = 0L;
+    for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
+      totalBytesRead += stat.getBytesRead();
+      ecBytesRead += stat.getBytesReadErasureCoded();
+    }
+    assertEquals(Long.valueOf(size), totalBytesRead);
+    assertEquals(Long.valueOf(size), ecBytesRead);
+
+    // verify thread local stats are correct
+    Long totalBytesReadThread = 0L;
+    Long ecBytesReadThread = 0L;
+    for (FileSystem.Statistics stat : FileSystem.getAllStatistics()) {
+      FileSystem.Statistics.StatisticsData data = stat.getThreadStatistics();
+      totalBytesReadThread += data.getBytesRead();
+      ecBytesReadThread += data.getBytesReadErasureCoded();
+    }
+    assertEquals(Long.valueOf(size), totalBytesReadThread);
+    assertEquals(Long.valueOf(size), ecBytesReadThread);
+  }
 }