ソースを参照

HADOOP-11601. Enhance FS spec & tests to mandate FileStatus.getBlocksize() >0 for non-empty files. Contributed by Steve Loughran

Mingliang Liu 8 年 前
コミット
ae8849fe37

+ 9 - 8
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

@@ -78,6 +78,7 @@ Get the status of a path
         if isFile(FS, p) :
             stat.length = len(FS.Files[p])
             stat.isdir = False
+            stat.blockSize > 0
         elif isDir(FS, p) :
             stat.length = 0
             stat.isdir = True
@@ -451,13 +452,13 @@ split calculations to divide work optimally across a set of worker processes.
 
 #### Postconditions
 
-    result = integer >= 0
+    result = integer > 0
 
 Although there is no defined minimum value for this result, as it
 is used to partition work during job submission, a block size
-that is too small will result in either too many jobs being submitted
-for efficient work, or the `JobSubmissionClient` running out of memory.
-
+that is too small will result in badly partitioned workload,
+or even the `JobSubmissionClient` and equivalent
+running out of memory as it calculates the partitions.
 
 Any FileSystem that does not actually break files into blocks SHOULD
 return a number for this that results in efficient processing.
@@ -503,12 +504,12 @@ on the filesystem.
 
 #### Postconditions
 
-
+    if len(FS, P) > 0:  getFileStatus(P).getBlockSize() > 0
     result == getFileStatus(P).getBlockSize()
 
-The outcome of this operation MUST be identical to that contained in
-the `FileStatus` returned from `getFileStatus(P)`.
-
+1. The outcome of this operation MUST be identical to the value of
+   `getFileStatus(P).getBlockSize()`.
+1. By inference, it MUST be > 0 for any file of length > 0.
 
 ## State Changing Operations
 

+ 94 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java

@@ -21,8 +21,8 @@ package org.apache.hadoop.fs.contract;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.junit.Test;
 import org.junit.internal.AssumptionViolatedException;
 
@@ -30,16 +30,22 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 
 /**
- * Test creating files, overwrite options &c
+ * Test creating files, overwrite options etc.
  */
 public abstract class AbstractContractCreateTest extends
                                                  AbstractFSContractTestBase {
 
+  /**
+   * How long to wait for a path to become visible.
+   */
+  public static final int CREATE_TIMEOUT = 15000;
+
   @Test
   public void testCreateNewFile() throws Throwable {
     describe("Foundational 'create a file' test");
@@ -180,4 +186,90 @@ public abstract class AbstractContractCreateTest extends
       }
     }
   }
+
+  @Test
+  public void testCreatedFileIsVisibleOnFlush() throws Throwable {
+    describe("verify that a newly created file exists once a flush has taken "
+        + "place");
+    Path path = path("testCreatedFileIsVisibleOnFlush");
+    FileSystem fs = getFileSystem();
+    try(FSDataOutputStream out = fs.create(path,
+          false,
+          4096,
+          (short) 1,
+          1024)) {
+      out.write('a');
+      out.flush();
+      if (!fs.exists(path)) {
+
+        if (isSupported(IS_BLOBSTORE)) {
+          // object store: downgrade to a skip so that the failure is visible
+          // in test results
+          skip("Filesystem is an object store and newly created files are not "
+              + "immediately visible");
+        }
+        assertPathExists("expected path to be visible before file closed",
+            path);
+      }
+    }
+  }
+
+  @Test
+  public void testCreatedFileIsEventuallyVisible() throws Throwable {
+    describe("verify a written to file is visible after the stream is closed");
+    Path path = path("testCreatedFileIsEventuallyVisible");
+    FileSystem fs = getFileSystem();
+    try(
+      FSDataOutputStream out = fs.create(path,
+          false,
+          4096,
+          (short) 1,
+          1024)
+      ) {
+      out.write(0x01);
+      out.close();
+      getFileStatusEventually(fs, path, CREATE_TIMEOUT);
+    }
+  }
+
+  @Test
+  public void testFileStatusBlocksizeNonEmptyFile() throws Throwable {
+    describe("validate the block size of a filesystem and files within it");
+    FileSystem fs = getFileSystem();
+
+    long rootPath = fs.getDefaultBlockSize(path("/"));
+    assertTrue("Root block size is invalid " + rootPath,
+        rootPath > 0);
+
+    Path path = path("testFileStatusBlocksizeNonEmptyFile");
+    byte[] data = dataset(256, 'a', 'z');
+
+    writeDataset(fs, path, data, data.length, 1024 * 1024, false);
+
+    validateBlockSize(fs, path, 1);
+  }
+
+  @Test
+  public void testFileStatusBlocksizeEmptyFile() throws Throwable {
+    describe("check that an empty file may return a 0-byte blocksize");
+    FileSystem fs = getFileSystem();
+    Path path = path("testFileStatusBlocksizeEmptyFile");
+    ContractTestUtils.touch(fs, path);
+    validateBlockSize(fs, path, 0);
+  }
+
+  private void validateBlockSize(FileSystem fs, Path path, int minValue)
+      throws IOException, InterruptedException {
+    FileStatus status =
+        getFileStatusEventually(fs, path, CREATE_TIMEOUT);
+    String statusDetails = status.toString();
+    assertTrue("File status block size too low:  " + statusDetails
+            + " min value: " + minValue,
+        status.getBlockSize() >= minValue);
+    long defaultBlockSize = fs.getDefaultBlockSize(path);
+    assertTrue("fs.getDefaultBlockSize(" + path + ") size " +
+            defaultBlockSize + " is below the minimum of " + minValue,
+        defaultBlockSize >= minValue);
+  }
+
 }

+ 31 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -1097,6 +1097,36 @@ public class ContractTestUtils extends Assert {
     return new HashSet<>(paths).size() != paths.size();
   }
 
+  /**
+   * Get the status of a path eventually, even if the FS doesn't have create
+   * consistency. If the path is not there by the time the timeout completes,
+   * an assertion is raised.
+   * @param fs FileSystem
+   * @param path path to look for
+   * @param timeout timeout in milliseconds
+   * @return the status
+   * @throws IOException if an I/O error occurs while writing or reading the
+   * test file <i>other than file not found</i>
+   */
+  public static FileStatus getFileStatusEventually(FileSystem fs, Path path,
+      int timeout) throws IOException, InterruptedException {
+    long endTime = System.currentTimeMillis() + timeout;
+    FileStatus stat = null;
+    do {
+      try {
+        stat = fs.getFileStatus(path);
+      } catch (FileNotFoundException e) {
+        if (System.currentTimeMillis() > endTime) {
+          // timeout, raise an assert with more diagnostics
+          assertPathExists(fs, "Path not found after " + timeout + " mS", path);
+        } else {
+          Thread.sleep(50);
+        }
+      }
+    } while (stat == null);
+    return stat;
+  }
+
   /**
    * Recursively list all entries, with a depth first traversal of the
    * directory tree.
@@ -1471,4 +1501,5 @@ public class ContractTestUtils extends Assert {
       return endTime;
     }
   }
+
 }