فهرست منبع

HDFS-7698. Fix locking on HDFS read statistics and add a method for clearing them. (Colin P. McCabe via yliu)

yliu 10 سال پیش
والد
کامیت
45ea53f938

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -848,6 +848,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7709. Fix findbug warnings in httpfs. (Rakesh R via ozawa)
 
+    HDFS-7698. Fix locking on HDFS read statistics and add a method for
+    clearing them. (Colin P. McCabe via yliu)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 46 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -131,10 +131,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
 
   public static class ReadStatistics {
     public ReadStatistics() {
-      this.totalBytesRead = 0;
-      this.totalLocalBytesRead = 0;
-      this.totalShortCircuitBytesRead = 0;
-      this.totalZeroCopyBytesRead = 0;
+      clear();
     }
 
     public ReadStatistics(ReadStatistics rhs) {
@@ -203,6 +200,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.totalShortCircuitBytesRead += amt;
       this.totalZeroCopyBytesRead += amt;
     }
+
+    void clear() {
+      this.totalBytesRead = 0;
+      this.totalLocalBytesRead = 0;
+      this.totalShortCircuitBytesRead = 0;
+      this.totalZeroCopyBytesRead = 0;
+    }
     
     private long totalBytesRead;
 
@@ -412,7 +416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Return collection of blocks that has already been located.
    */
-  public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
+  public List<LocatedBlock> getAllBlocks() throws IOException {
     return getBlockRange(0, getFileLength());
   }
 
@@ -700,26 +704,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * strategy-agnostic.
    */
   private interface ReaderStrategy {
-    public int doRead(BlockReader blockReader, int off, int len,
-        ReadStatistics readStatistics) throws ChecksumException, IOException;
+    public int doRead(BlockReader blockReader, int off, int len)
+        throws ChecksumException, IOException;
   }
 
-  private static void updateReadStatistics(ReadStatistics readStatistics, 
+  private void updateReadStatistics(ReadStatistics readStatistics, 
         int nRead, BlockReader blockReader) {
     if (nRead <= 0) return;
-    if (blockReader.isShortCircuit()) {
-      readStatistics.addShortCircuitBytes(nRead);
-    } else if (blockReader.isLocal()) {
-      readStatistics.addLocalBytes(nRead);
-    } else {
-      readStatistics.addRemoteBytes(nRead);
+    synchronized(infoLock) {
+      if (blockReader.isShortCircuit()) {
+        readStatistics.addShortCircuitBytes(nRead);
+      } else if (blockReader.isLocal()) {
+        readStatistics.addLocalBytes(nRead);
+      } else {
+        readStatistics.addRemoteBytes(nRead);
+      }
     }
   }
   
   /**
    * Used to read bytes into a byte[]
    */
-  private static class ByteArrayStrategy implements ReaderStrategy {
+  private class ByteArrayStrategy implements ReaderStrategy {
     final byte[] buf;
 
     public ByteArrayStrategy(byte[] buf) {
@@ -727,26 +733,26 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
 
     @Override
-    public int doRead(BlockReader blockReader, int off, int len,
-            ReadStatistics readStatistics) throws ChecksumException, IOException {
-        int nRead = blockReader.read(buf, off, len);
-        updateReadStatistics(readStatistics, nRead, blockReader);
-        return nRead;
+    public int doRead(BlockReader blockReader, int off, int len)
+          throws ChecksumException, IOException {
+      int nRead = blockReader.read(buf, off, len);
+      updateReadStatistics(readStatistics, nRead, blockReader);
+      return nRead;
     }
   }
 
   /**
    * Used to read bytes into a user-supplied ByteBuffer
    */
-  private static class ByteBufferStrategy implements ReaderStrategy {
+  private class ByteBufferStrategy implements ReaderStrategy {
     final ByteBuffer buf;
     ByteBufferStrategy(ByteBuffer buf) {
       this.buf = buf;
     }
 
     @Override
-    public int doRead(BlockReader blockReader, int off, int len,
-        ReadStatistics readStatistics) throws ChecksumException, IOException {
+    public int doRead(BlockReader blockReader, int off, int len)
+        throws ChecksumException, IOException {
       int oldpos = buf.position();
       int oldlimit = buf.limit();
       boolean success = false;
@@ -785,7 +791,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     while (true) {
       // retry as many times as seekToNewSource allows.
       try {
-        return reader.doRead(blockReader, off, len, readStatistics);
+        return reader.doRead(blockReader, off, len);
       } catch ( ChecksumException ce ) {
         DFSClient.LOG.warn("Found Checksum error for "
             + getCurrentBlock() + " from " + currentNode
@@ -1612,8 +1618,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Get statistics about the reads which this DFSInputStream has done.
    */
-  public synchronized ReadStatistics getReadStatistics() {
-    return new ReadStatistics(readStatistics);
+  public ReadStatistics getReadStatistics() {
+    synchronized(infoLock) {
+      return new ReadStatistics(readStatistics);
+    }
+  }
+
+  /**
+   * Clear statistics about the reads which this DFSInputStream has done.
+   */
+  public void clearReadStatistics() {
+    synchronized(infoLock) {
+      readStatistics.clear();
+    }
   }
 
   public FileEncryptionInfo getFileEncryptionInfo() {
@@ -1775,7 +1792,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       buffer.position((int)blockPos);
       buffer.limit((int)(blockPos + length));
       extendedReadBuffers.put(buffer, clientMmap);
-      readStatistics.addZeroCopyBytes(length);
+      synchronized (infoLock) {
+        readStatistics.addZeroCopyBytes(length);
+      }
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("readZeroCopy read " + length + 
             " bytes from offset " + curPos + " via the zero-copy read " +

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java

@@ -103,7 +103,11 @@ public class HdfsDataInputStream extends FSDataInputStream {
    * be higher than you would expect just by adding up the number of
    * bytes read through HdfsDataInputStream.
    */
-  public synchronized DFSInputStream.ReadStatistics getReadStatistics() {
+  public DFSInputStream.ReadStatistics getReadStatistics() {
     return getDFSInputStream().getReadStatistics();
   }
+
+  public void clearReadStatistics() {
+    getDFSInputStream().clearReadStatistics();
+  }
 }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c

@@ -79,6 +79,11 @@ static const struct ExceptionInfo gExceptionInfo[] = {
         0,
         EDQUOT,
     },
+    {
+        "java.lang.UnsupportedOperationException",
+        0,
+        ENOTSUP,
+    },
     {
         "org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException",
         0,

+ 32 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c

@@ -181,7 +181,38 @@ done:
 int64_t hdfsReadStatisticsGetRemoteBytesRead(
                             const struct hdfsReadStatistics *stats)
 {
-  return stats->totalBytesRead - stats->totalLocalBytesRead;
+    return stats->totalBytesRead - stats->totalLocalBytesRead;
+}
+
+int hdfsFileClearReadStatistics(hdfsFile file)
+{
+    jthrowable jthr;
+    int ret;
+    JNIEnv* env = getJNIEnv();
+
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return EINTERNAL;
+    }
+    if (file->type != HDFS_STREAM_INPUT) {
+        ret = EINVAL;
+        goto done;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, file->file,
+                  "org/apache/hadoop/hdfs/client/HdfsDataInputStream",
+                  "clearReadStatistics", "()V");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileClearReadStatistics: clearReadStatistics failed");
+        goto done;
+    }
+    ret = 0;
+done:
+    if (ret) {
+        errno = ret;
+        return ret;
+    }
+    return 0;
 }
 
 void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h

@@ -118,6 +118,19 @@ extern  "C" {
     int64_t hdfsReadStatisticsGetRemoteBytesRead(
                             const struct hdfsReadStatistics *stats);
 
+    /**
+     * Clear the read statistics for a file.
+     *
+     * @param file      The file to clear the read statistics of.
+     *
+     * @return          0 on success; the error code otherwise.
+     *                  EINVAL: the file is not open for reading.
+     *                  ENOTSUP: the file does not support clearing the read
+     *                  statistics.
+     *                  Errno will also be set to this code on failure.
+     */
+    int hdfsFileClearReadStatistics(hdfsFile file);
+
     /**
      * Free some HDFS read statistics.
      *

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c

@@ -205,6 +205,10 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
     errno = 0;
     EXPECT_UINT64_EQ((uint64_t)expected, readStats->totalBytesRead);
     hdfsFileFreeReadStatistics(readStats);
+    EXPECT_ZERO(hdfsFileClearReadStatistics(file));
+    EXPECT_ZERO(hdfsFileGetReadStatistics(file, &readStats));
+    EXPECT_UINT64_EQ((uint64_t)0, readStats->totalBytesRead);
+    hdfsFileFreeReadStatistics(readStats);
     EXPECT_ZERO(memcmp(paths->prefix, tmp, expected));
     EXPECT_ZERO(hdfsCloseFile(fs, file));