浏览代码

HADOOP-17945. JsonSerialization raises EOFException reading JSON data stored on google GCS (#3501)

Contributed By: Steve Loughran
Steve Loughran 3 年之前
父节点
当前提交
b8f3e54ff7

+ 7 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathIOException.java

@@ -64,7 +64,13 @@ public class PathIOException extends IOException {
     this.path = path;
   }
 
-  protected PathIOException(String path, String error, Throwable cause) {
+  /**
+   * Use a subclass of PathIOException if possible.
+   * @param path for the exception
+   * @param error custom string to use an the error text
+   * @param cause cause of exception.
+   */
+  public PathIOException(String path, String error, Throwable cause) {
     super(error, cause);
     this.path = path;
   }

+ 37 - 17
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.util;
 
+import javax.annotation.Nullable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -43,8 +44,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+
+import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
 /**
  * Support for marshalling objects to and from JSON.
@@ -229,30 +235,44 @@ public class JsonSerialization<T> {
 
   /**
    * Load from a Hadoop filesystem.
-   * There's a check for data availability after the file is open, by
-   * raising an EOFException if stream.available == 0.
-   * This allows for a meaningful exception without the round trip overhead
-   * of a getFileStatus call before opening the file. It may be brittle
-   * against an FS stream which doesn't return a value here, but the
-   * standard filesystems all do.
-   * JSON parsing and mapping problems
-   * are converted to IOEs.
    * @param fs filesystem
    * @param path path
    * @return a loaded object
-   * @throws IOException IO or JSON parse problems
+   * @throws PathIOException JSON parse problem
+   * @throws IOException IO problems
    */
   public T load(FileSystem fs, Path path) throws IOException {
-    try (FSDataInputStream dataInputStream = fs.open(path)) {
-      // throw an EOF exception if there is no data available.
-      if (dataInputStream.available() == 0) {
-        throw new EOFException("No data in " + path);
-      }
+    return load(fs, path, null);
+  }
+
+  /**
+   * Load from a Hadoop filesystem.
+   * If a file status is supplied, it's passed in to the openFile()
+   * call so that FS implementations can optimize their opening.
+   * @param fs filesystem
+   * @param path path
+   * @param status status of the file to open.
+   * @return a loaded object
+   * @throws PathIOException JSON parse problem
+   * @throws EOFException file status references an empty file
+   * @throws IOException IO problems
+   */
+  public T load(FileSystem fs, Path path, @Nullable FileStatus status)
+      throws IOException {
+
+    if (status != null && status.getLen() == 0) {
+      throw new EOFException("No data in " + path);
+    }
+    FutureDataInputStreamBuilder builder = fs.openFile(path);
+    if (status != null) {
+      builder.withFileStatus(status);
+    }
+    try (FSDataInputStream dataInputStream =
+             awaitFuture(builder.build())) {
       return fromJsonStream(dataInputStream);
     } catch (JsonProcessingException e) {
-      throw new IOException(
-          String.format("Failed to read JSON file \"%s\": %s", path, e),
-          e);
+      throw new PathIOException(path.toString(),
+          "Failed to read JSON file " + e, e);
     }
   }
 

+ 36 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java

@@ -28,9 +28,11 @@ import com.fasterxml.jackson.core.JsonParseException;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.test.HadoopTestBase;
 import org.apache.hadoop.test.LambdaTestUtils;
 
@@ -151,6 +153,9 @@ public class TestJsonSerialization extends HadoopTestBase {
     }
   }
 
+  /**
+   * round trip through both load APIs.
+   */
   @Test
   public void testFileSystemRoundTrip() throws Throwable {
     File tempFile = File.createTempFile("Keyval", ".json");
@@ -159,19 +164,30 @@ public class TestJsonSerialization extends HadoopTestBase {
     LocalFileSystem fs = FileSystem.getLocal(new Configuration());
     try {
       serDeser.save(fs, tempPath, source, false);
-      assertEquals(source, serDeser.load(fs, tempPath));
+      assertEquals("JSON loaded with load(fs, path)",
+          source,
+          serDeser.load(fs, tempPath));
+      assertEquals("JSON loaded with load(fs, path, status)",
+          source,
+          serDeser.load(fs, tempPath, fs.getFileStatus(tempPath)));
     } finally {
       fs.delete(tempPath, false);
     }
   }
 
+  /**
+   * 0 byte file through the load(path) API will fail with a wrapped
+   * Parser exception.
+   * 0 byte file through the load(path, status) API will fail with a wrapped
+   * Parser exception.
+   */
   @Test
   public void testFileSystemEmptyPath() throws Throwable {
     File tempFile = File.createTempFile("Keyval", ".json");
     Path tempPath = new Path(tempFile.toURI());
     LocalFileSystem fs = FileSystem.getLocal(new Configuration());
     try {
-      LambdaTestUtils.intercept(EOFException.class,
+      LambdaTestUtils.intercept(PathIOException.class,
           () -> serDeser.load(fs, tempPath));
       fs.delete(tempPath, false);
       LambdaTestUtils.intercept(FileNotFoundException.class,
@@ -181,5 +197,23 @@ public class TestJsonSerialization extends HadoopTestBase {
     }
   }
 
+  /**
+   * 0 byte file through the load(path, status) API will fail with an
+   * EOFException.
+   */
+  @Test
+  public void testFileSystemEmptyStatus() throws Throwable {
+    File tempFile = File.createTempFile("Keyval", ".json");
+    Path tempPath = new Path(tempFile.toURI());
+    LocalFileSystem fs = FileSystem.getLocal(new Configuration());
+    try {
+      final FileStatus st = fs.getFileStatus(tempPath);
+      LambdaTestUtils.intercept(EOFException.class,
+          () -> serDeser.load(fs, tempPath, st));
+    } finally {
+      fs.delete(tempPath, false);
+    }
+  }
+
 
 }