瀏覽代碼

HADOOP-16107. Update ChecksumFileSystem createFile/openFile API to generate checksum.
Contributed by Steve Loughran

Eric Yang 6 年之前
父節點
當前提交
feccd282fe

+ 86 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

@@ -24,15 +24,22 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.Progressable;
 
 /****************************************************************
@@ -484,6 +491,32 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
         blockSize, progress);
   }
 
+  @Override
+  public FSDataOutputStream create(final Path f,
+      final FsPermission permission,
+      final EnumSet<CreateFlag> flags,
+      final int bufferSize,
+      final short replication,
+      final long blockSize,
+      final Progressable progress,
+      final Options.ChecksumOpt checksumOpt) throws IOException {
+    return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
+        bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission,
+      final EnumSet<CreateFlag> flags,
+      final int bufferSize,
+      final short replication,
+      final long blockSize,
+      final Progressable progress) throws IOException {
+    return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
+        false, bufferSize, replication,
+        blockSize, progress);
+  }
+
   abstract class FsOperation {
     boolean run(Path p) throws IOException {
       boolean status = apply(p);
@@ -780,4 +813,57 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
                                        long inPos, FSDataInputStream sums, long sumsPos) {
     return false;
   }
+
+  /**
+   * This is overridden to ensure that this class's
+   * {@link #openFileWithOptions}() method is called, and so ultimately
+   * its {@link #open(Path, int)}.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public FutureDataInputStreamBuilder openFile(final Path path)
+      throws IOException, UnsupportedOperationException {
+    return ((FutureDataInputStreamBuilderImpl)
+        createDataInputStreamBuilder(this, path)).getThisBuilder();
+  }
+
+  /**
+   * Open the file as a blocking call to {@link #open(Path, int)}.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  protected CompletableFuture<FSDataInputStream> openFileWithOptions(
+      final Path path,
+      final Set<String> mandatoryKeys,
+      final Configuration options,
+      final int bufferSize) throws IOException {
+    AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(mandatoryKeys,
+        Collections.emptySet(),
+        "for " + path);
+    return LambdaUtils.eval(
+        new CompletableFuture<>(), () -> open(path, bufferSize));
+  }
+
+  /**
+   * This is overridden to ensure that this class's create() method is
+   * ultimately called.
+   *
+   * {@inheritDoc}
+   */
+  public FSDataOutputStreamBuilder createFile(Path path) {
+    return createDataOutputStreamBuilder(this, path)
+        .create().overwrite(true);
+  }
+
+  /**
+   * This is overridden to ensure that this class's create() method is
+   * ultimately called.
+   *
+   * {@inheritDoc}
+   */
+  public FSDataOutputStreamBuilder appendFile(Path path) {
+    return createDataOutputStreamBuilder(this, path).append();
+  }
 }

+ 55 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -4240,14 +4240,34 @@ public abstract class FileSystem extends Configured
     return GlobalStorageStatistics.INSTANCE;
   }
 
+  /**
+   * Create instance of the standard FSDataOutputStreamBuilder for the
+   * given filesystem and path.
+   * @param fileSystem owner
+   * @param path path to create
+   * @return a builder.
+   */
+  @InterfaceStability.Unstable
+  protected static FSDataOutputStreamBuilder createDataOutputStreamBuilder(
+      @Nonnull final FileSystem fileSystem,
+      @Nonnull final Path path) {
+    return new FileSystemDataOutputStreamBuilder(fileSystem, path);
+  }
+
+  /**
+   * Standard implementation of the FSDataOutputStreamBuilder; invokes
+   * create/createNonRecursive or Append depending upon the options.
+   */
   private static final class FileSystemDataOutputStreamBuilder extends
       FSDataOutputStreamBuilder<FSDataOutputStream,
         FileSystemDataOutputStreamBuilder> {
 
     /**
      * Constructor.
+     * @param fileSystem owner
+     * @param p path to create
      */
-    protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+    private FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
       super(fileSystem, p);
     }
 
@@ -4290,7 +4310,7 @@ public abstract class FileSystem extends Configured
    * builder interface becomes stable.
    */
   public FSDataOutputStreamBuilder createFile(Path path) {
-    return new FileSystemDataOutputStreamBuilder(this, path)
+    return createDataOutputStreamBuilder(this, path)
         .create().overwrite(true);
   }
 
@@ -4300,7 +4320,7 @@ public abstract class FileSystem extends Configured
    * @return a {@link FSDataOutputStreamBuilder} to build file append request.
    */
   public FSDataOutputStreamBuilder appendFile(Path path) {
-    return new FileSystemDataOutputStreamBuilder(this, path).append();
+    return createDataOutputStreamBuilder(this, path).append();
   }
 
   /**
@@ -4321,7 +4341,7 @@ public abstract class FileSystem extends Configured
   @InterfaceStability.Unstable
   public FutureDataInputStreamBuilder openFile(Path path)
       throws IOException, UnsupportedOperationException {
-    return new FSDataInputStreamBuilder(this, path).getThisBuilder();
+    return createDataInputStreamBuilder(this, path).getThisBuilder();
   }
 
   /**
@@ -4340,7 +4360,7 @@ public abstract class FileSystem extends Configured
   @InterfaceStability.Unstable
   public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
       throws IOException, UnsupportedOperationException {
-    return new FSDataInputStreamBuilder(this, pathHandle)
+    return createDataInputStreamBuilder(this, pathHandle)
         .getThisBuilder();
   }
 
@@ -4416,6 +4436,36 @@ public abstract class FileSystem extends Configured
     return result;
   }
 
+  /**
+   * Create instance of the standard {@link FSDataInputStreamBuilder} for the
+   * given filesystem and path.
+   * @param fileSystem owner
+   * @param path path to read
+   * @return a builder.
+   */
+  @InterfaceAudience.LimitedPrivate("Filesystems")
+  @InterfaceStability.Unstable
+  protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
+      @Nonnull final FileSystem fileSystem,
+      @Nonnull final Path path) {
+    return new FSDataInputStreamBuilder(fileSystem, path);
+  }
+
+  /**
+   * Create instance of the standard {@link FSDataInputStreamBuilder} for the
+   * given filesystem and path handle.
+   * @param fileSystem owner
+   * @param pathHandle path handle of file to open.
+   * @return a builder.
+   */
+  @InterfaceAudience.LimitedPrivate("Filesystems")
+  @InterfaceStability.Unstable
+  protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
+      @Nonnull final FileSystem fileSystem,
+      @Nonnull final PathHandle pathHandle) {
+    return new FSDataInputStreamBuilder(fileSystem, pathHandle);
+  }
+
   /**
    * Builder returned for {@code #openFile(Path)}
    * and {@code #openFile(PathHandle)}.

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java

@@ -32,12 +32,14 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
@@ -464,4 +466,14 @@ class ChRootedFileSystem extends FilterFileSystem {
     super.unsetStoragePolicy(fullPath(src));
   }
 
+  @Override
+  public FSDataOutputStreamBuilder createFile(final Path path) {
+    return super.createFile(fullPath(path));
+  }
+
+  @Override
+  public FutureDataInputStreamBuilder openFile(final Path path)
+      throws IOException, UnsupportedOperationException {
+    return super.openFile(fullPath(path));
+  }
 }

+ 227 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java

@@ -35,10 +35,12 @@ import java.io.*;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
 import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
@@ -774,4 +776,229 @@ public class TestLocalFileSystem {
         "unsupported key found", builder::build
     );
   }
+
+  private static final int CRC_SIZE = 12;
+
+  private static final byte[] DATA = "1234567890".getBytes();
+
+  /**
+   * Get the statistics for the file schema. Contains assertions
+   * @return the statistics on all file:// IO.
+   */
+  protected Statistics getFileStatistics() {
+    final List<Statistics> all = FileSystem.getAllStatistics();
+    final List<Statistics> fileStats = all
+        .stream()
+        .filter(s -> s.getScheme().equals("file"))
+        .collect(Collectors.toList());
+    assertEquals("Number of statistics counters for file://",
+        1, fileStats.size());
+    // this should be used for local and rawLocal, as they share the
+    // same schema (although their class is different)
+    return fileStats.get(0);
+  }
+
+  /**
+   * Write the byte array {@link #DATA} to the given output stream.
+   * @param s stream to write to.
+   * @throws IOException failure to write/close the file
+   */
+  private void writeData(FSDataOutputStream s) throws IOException {
+    s.write(DATA);
+    s.close();
+  }
+
+  /**
+   * Evaluate the closure while counting bytes written during
+   * its execution, and verify that the count included the CRC
+   * write as well as the data.
+   * After the operation, the file is deleted.
+   * @param operation operation for assertion method.
+   * @param path path to write
+   * @param callable expression evaluated
+   * @param delete should the file be deleted after?
+   */
+  private void assertWritesCRC(String operation, Path path,
+      LambdaTestUtils.VoidCallable callable, boolean delete) throws Exception {
+    final Statistics stats = getFileStatistics();
+    final long bytesOut0 = stats.getBytesWritten();
+    try {
+      callable.call();
+      assertEquals("Bytes written in " + operation + "; stats=" + stats,
+          CRC_SIZE + DATA.length, stats.getBytesWritten() - bytesOut0);
+    } finally {
+      if (delete) {
+        // clean up
+        try {
+          fileSys.delete(path, false);
+        } catch (IOException ignored) {
+          // ignore this cleanup failure
+        }
+      }
+    }
+  }
+
+  /**
+   * Verify that File IO through the classic non-builder APIs generate
+   * statistics which imply that CRCs were read and written.
+   */
+  @Test
+  public void testCRCwithClassicAPIs() throws Throwable {
+    final Path file = new Path(TEST_ROOT_DIR, "testByteCountersClassicAPIs");
+    assertWritesCRC("create()",
+        file,
+        () -> writeData(fileSys.create(file, true)),
+        false);
+
+    final Statistics stats = getFileStatistics();
+    final long bytesRead0 = stats.getBytesRead();
+    fileSys.open(file).close();
+    final long bytesRead1 = stats.getBytesRead();
+    assertEquals("Bytes read in open() call with stats " + stats,
+        CRC_SIZE, bytesRead1 - bytesRead0);
+  }
+
+  /**
+   * create/7 to use write the CRC.
+   */
+  @Test
+  public void testCRCwithCreate7() throws Throwable {
+    final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreate7");
+    assertWritesCRC("create/7",
+        file,
+        () -> writeData(
+            fileSys.create(file,
+                FsPermission.getFileDefault(),
+                true,
+                8192,
+                (short)1,
+                16384,
+                null)),
+        true);
+  }
+
+  /**
+   * Create with ChecksumOpt to create checksums.
+   * If the LocalFS ever interpreted the flag, this test may fail.
+   */
+  @Test
+  public void testCRCwithCreateChecksumOpt() throws Throwable {
+    final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreateChecksumOpt");
+    assertWritesCRC("create with checksum opt",
+        file,
+        () -> writeData(
+            fileSys.create(file,
+                FsPermission.getFileDefault(),
+                EnumSet.of(CreateFlag.CREATE),
+                8192,
+                (short)1,
+                16384,
+                null,
+                Options.ChecksumOpt.createDisabled())),
+        true);
+  }
+
+  /**
+   * Create createNonRecursive/6.
+   */
+  @Test
+  public void testCRCwithCreateNonRecursive6() throws Throwable {
+    fileSys.mkdirs(TEST_PATH);
+    final Path file = new Path(TEST_ROOT_DIR,
+        "testCRCwithCreateNonRecursive6");
+    assertWritesCRC("create with checksum opt",
+        file,
+        () -> writeData(
+            fileSys.createNonRecursive(file,
+                FsPermission.getFileDefault(),
+                true,
+                8192,
+                (short)1,
+                16384,
+                null)),
+        true);
+  }
+
+  /**
+   * Create createNonRecursive with CreateFlags.
+   */
+  @Test
+  public void testCRCwithCreateNonRecursiveCreateFlags() throws Throwable {
+    fileSys.mkdirs(TEST_PATH);
+    final Path file = new Path(TEST_ROOT_DIR,
+        "testCRCwithCreateNonRecursiveCreateFlags");
+    assertWritesCRC("create with checksum opt",
+        file,
+        () -> writeData(
+            fileSys.createNonRecursive(file,
+                FsPermission.getFileDefault(),
+                EnumSet.of(CreateFlag.CREATE),
+                8192,
+                (short)1,
+                16384,
+                null)),
+        true);
+  }
+
+
+  /**
+   * This relates to MAPREDUCE-7184, where the openFile() call's
+   * CRC count wasn't making into the statistics for the current thread.
+   * If the evaluation was in a separate thread you'd expect that,
+   * but if the completable future is in fact being synchronously completed
+   * it should not happen.
+   */
+  @Test
+  public void testReadIncludesCRCwithBuilders() throws Throwable {
+
+    final Path file = new Path(TEST_ROOT_DIR,
+        "testReadIncludesCRCwithBuilders");
+    Statistics stats = getFileStatistics();
+    // write the file using the builder API
+    assertWritesCRC("createFile()",
+        file,
+        () -> writeData(
+            fileSys.createFile(file)
+                .overwrite(true).recursive()
+                .build()),
+        false);
+
+    // now read back the data, again with the builder API
+    final long bytesRead0 = stats.getBytesRead();
+    fileSys.openFile(file).build().get().close();
+    assertEquals("Bytes read in openFile() call with stats " + stats,
+        CRC_SIZE, stats.getBytesRead() - bytesRead0);
+    // now write with overwrite = true
+    assertWritesCRC("createFileNonRecursive()",
+        file,
+        () -> {
+          try (FSDataOutputStream s = fileSys.createFile(file)
+              .overwrite(true)
+              .build()) {
+            s.write(DATA);
+          }
+        },
+        true);
+  }
+
+  /**
+   * Write with the builder, using the normal recursive create
+   * with create flags containing the overwrite option.
+   */
+  @Test
+  public void testWriteWithBuildersRecursive() throws Throwable {
+
+    final Path file = new Path(TEST_ROOT_DIR,
+        "testWriteWithBuildersRecursive");
+    Statistics stats = getFileStatistics();
+    // write the file using the builder API
+    assertWritesCRC("createFile()",
+        file,
+        () -> writeData(
+            fileSys.createFile(file)
+                .overwrite(false)
+                .recursive()
+                .build()),
+        true);
+  }
 }

+ 76 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestFutureIO.java

@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+import org.apache.hadoop.util.LambdaUtils;
+
+/**
+ * Test behavior of {@link FutureIOSupport}, especially "what thread do things
+ * happen in?".
+ */
+public class TestFutureIO extends HadoopTestBase {
+
+  private ThreadLocal<AtomicInteger> local;
+
+  @Before
+  public void setup() throws Exception {
+    local = ThreadLocal.withInitial(() -> new AtomicInteger(1));
+  }
+
+  /**
+   * Simple eval is blocking and executes in the same thread.
+   */
+  @Test
+  public void testEvalInCurrentThread() throws Throwable {
+    CompletableFuture<Integer> result = new CompletableFuture<>();
+    CompletableFuture<Integer> eval = LambdaUtils.eval(result,
+        () -> {
+          return getLocal().addAndGet(2);
+        });
+    assertEquals("Thread local value", 3, getLocalValue());
+    assertEquals("Evaluated Value", 3, eval.get().intValue());
+  }
+
+  /**
+   * A supply async call runs things in a shared thread pool.
+   */
+  @Test
+  public void testEvalAsync() throws Throwable {
+    final CompletableFuture<Integer> eval = CompletableFuture.supplyAsync(
+        () -> getLocal().addAndGet(2));
+    assertEquals("Thread local value", 1, getLocalValue());
+    assertEquals("Evaluated Value", 3, eval.get().intValue());
+  }
+
+
+  protected AtomicInteger getLocal() {
+    return local.get();
+  }
+
+  protected int getLocalValue() {
+    return local.get().get();
+  }
+}