소스 검색

HADOOP-14747. S3AInputStream to implement CanUnbuffer.

Author:    Sahil Takiar <stakiar@cloudera.com>
Sahil Takiar 6 년 전
부모
커밋
2382f63fc0

+ 37 - 0
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

@@ -275,6 +275,43 @@ class, which can react to a checksum error in a read by attempting to source
 the data elsewhere. If a new source can be found it attempts to reread and
 recheck that portion of the file.
 
+### `CanUnbuffer.unbuffer()`
+
+This operation instructs the source to release any system resources they are
+currently holding on to, such as buffers, sockets, file descriptors, etc. Any
+subsequent IO operation will likely have to reacquire these resources.
+Unbuffering is useful in situation where streams need to remain open, but no IO
+operation is expected from the stream in the immediate future (examples include
+file handle cacheing).
+
+#### Preconditions
+
+Not all subclasses implement this operation. In addition to implementing
+`CanUnbuffer`. Subclasses must implement the `StreamCapabilities` interface and
+`StreamCapabilities.hasCapability(UNBUFFER)` must return true. If a subclass
+implements `CanUnbuffer` but does not report the functionality via
+`StreamCapabilities` then the call to `unbuffer` does nothing. If a subclass
+reports that it does implement `UNBUFFER`, but does not implement the
+`CanUnbuffer` interface, an `UnsupportedOperationException` is thrown.
+
+    supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
+
+This method is not thread-safe. If `unbuffer` is called while a `read` is in
+progress, the outcome is undefined.
+
+`unbuffer` can be called on a closed file, in which case `unbuffer` will do
+nothing.
+
+#### Postconditions
+
+The majority of subclasses that do not implement this operation simply
+do nothing.
+
+If the operation is supported, `unbuffer` releases any and all system resources
+associated with the stream. The exact list of what these resources are is
+generally implementation dependent, however, in general, it may include
+buffers, sockets, file descriptors, etc.
+
 ## <a name="PositionedReadable"></a> interface `PositionedReadable`
 
 The `PositionedReadable` operations supply "positioned reads" ("pread").

+ 125 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java

@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
+ */
+public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase {
+
+  private Path file;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfUnsupported(SUPPORTS_UNBUFFER);
+    file = path("unbufferFile");
+    createFile(getFileSystem(), file, true,
+            dataset(TEST_FILE_LEN, 0, 255));
+  }
+
+  @Test
+  public void testUnbufferAfterRead() throws IOException {
+    describe("unbuffer a file after a single read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferBeforeRead() throws IOException {
+    describe("unbuffer a file before a read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+    }
+  }
+
+  @Test
+  public void testUnbufferEmptyFile() throws IOException {
+    Path emptyFile = path("emptyUnbufferFile");
+    createFile(getFileSystem(), emptyFile, true,
+            dataset(TEST_FILE_LEN, 0, 255));
+    describe("unbuffer an empty file");
+    try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferOnClosedFile() throws IOException {
+    describe("unbuffer a file before a read");
+    FSDataInputStream stream = null;
+    try {
+      stream = getFileSystem().open(file);
+      assertEquals(128, stream.read(new byte[128]));
+    } finally {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    unbuffer(stream);
+  }
+
+  @Test
+  public void testMultipleUnbuffers() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+      unbuffer(stream);
+    }
+  }
+
+   @Test
+  public void testUnbufferMultipleReads() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+      assertEquals(128, stream.read(new byte[128]));
+      assertEquals(128, stream.read(new byte[128]));
+      assertEquals(128, stream.read(new byte[128]));
+      unbuffer(stream);
+    }
+  }
+
+  private void unbuffer(FSDataInputStream stream) throws IOException {
+    long pos = stream.getPos();
+    stream.unbuffer();
+    assertEquals(pos, stream.getPos());
+  }
+}

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java

@@ -201,6 +201,11 @@ public interface ContractOptions {
    */
   String SUPPORTS_CONTENT_CHECK = "supports-content-check";
 
+  /**
+   * Indicates that FS supports unbuffer.
+   */
+  String SUPPORTS_UNBUFFER = "supports-unbuffer";
+
   /**
    * Maximum path length
    * {@value}

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java

@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class TestHDFSContractUnbuffer extends AbstractContractUnbufferTest {
+
+  @BeforeClass
+  public static void createCluster() throws IOException {
+    HDFSContract.createCluster();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws IOException {
+    HDFSContract.destroyCluster();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HDFSContract(conf);
+  }
+}

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml

@@ -111,4 +111,9 @@
     <value>true</value>
   </property>
 
+  <property>
+    <name>fs.contract.supports-unbuffer</name>
+    <value>true</value>
+  </property>
+
 </configuration>

+ 32 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -25,14 +25,17 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import com.amazonaws.services.s3.model.SSECustomerKey;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 
 import org.slf4j.Logger;
@@ -43,6 +46,7 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
+import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
 /**
  * The input stream for an S3A object.
@@ -63,7 +67,8 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class S3AInputStream extends FSInputStream implements CanSetReadahead {
+public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
+        CanUnbuffer, StreamCapabilities {
 
   public static final String E_NEGATIVE_READAHEAD_VALUE
       = "Negative readahead value";
@@ -175,7 +180,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private synchronized void reopen(String reason, long targetPos, long length,
           boolean forceAbort) throws IOException {
 
-    if (wrappedStream != null) {
+    if (isObjectStreamOpen()) {
       closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
     }
 
@@ -542,7 +547,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   @Retries.OnceRaw
   private void closeStream(String reason, long length, boolean forceAbort) {
-    if (wrappedStream != null) {
+    if (isObjectStreamOpen()) {
 
       // if the amount of data remaining in the current request is greater
       // than the readahead value: abort.
@@ -605,12 +610,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   @InterfaceStability.Unstable
   public synchronized boolean resetConnection() throws IOException {
     checkNotClosed();
-    boolean connectionOpen = wrappedStream != null;
-    if (connectionOpen) {
+    if (isObjectStreamOpen()) {
       LOG.info("Forced reset of connection to {}", uri);
       closeStream("reset()", contentRangeFinish, true);
     }
-    return connectionOpen;
+    return isObjectStreamOpen();
   }
 
   @Override
@@ -677,7 +681,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
           "S3AInputStream{");
       sb.append(uri);
       sb.append(" wrappedStream=")
-          .append(wrappedStream != null ? "open" : "closed");
+          .append(isObjectStreamOpen() ? "open" : "closed");
       sb.append(" read policy=").append(inputPolicy);
       sb.append(" pos=").append(pos);
       sb.append(" nextReadPos=").append(nextReadPos);
@@ -814,4 +818,25 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       return readahead;
     }
   }
+
+  @Override
+  public synchronized void unbuffer() {
+    closeStream("unbuffer()", contentRangeFinish, false);
+  }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (toLowerCase(capability)) {
+    case StreamCapabilities.READAHEAD:
+    case StreamCapabilities.UNBUFFER:
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  boolean isObjectStreamOpen() {
+    return wrappedStream != null;
+  }
 }

+ 41 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
+
+public class ITestS3AContractUnbuffer extends AbstractContractUnbufferTest {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // patch in S3Guard options
+    maybeEnableS3Guard(conf);
+    return conf;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 66 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Integration test for calling
+ * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}.
+ * Validates that the object has been closed using the
+ * {@link S3AInputStream#isObjectStreamOpen()} method. Unlike the
+ * {@link org.apache.hadoop.fs.contract.s3a.ITestS3AContractUnbuffer} tests,
+ * these tests leverage the fact that isObjectStreamOpen exposes if the
+ * underlying stream has been closed or not.
+ */
+public class ITestS3AUnbuffer extends AbstractS3ATestBase {
+
+  @Test
+  public void testUnbuffer() throws IOException {
+    // Setup test file
+    Path dest = path("testUnbuffer");
+    describe("testUnbuffer");
+    try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) {
+      byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+      outputStream.write(data);
+    }
+
+    // Open file, read half the data, and then call unbuffer
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
+      assertEquals(8, inputStream.read(new byte[8]));
+      assertTrue(isObjectStreamOpen(inputStream));
+      inputStream.unbuffer();
+
+      // Check the the wrapped stream is closed
+      assertFalse(isObjectStreamOpen(inputStream));
+    }
+  }
+
+  private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
+    return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
+  }
+}

+ 76 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Uses mocks to check that the {@link S3ObjectInputStream} is closed when
+ * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called. Unlike the
+ * other unbuffer tests, this specifically tests that the underlying S3 object
+ * stream is closed.
+ */
+public class TestS3AUnbuffer extends AbstractS3AMockTest {
+
+  @Test
+  public void testUnbuffer() throws IOException {
+    // Create mock ObjectMetadata for getFileStatus()
+    Path path = new Path("/file");
+    ObjectMetadata meta = mock(ObjectMetadata.class);
+    when(meta.getContentLength()).thenReturn(1L);
+    when(meta.getLastModified()).thenReturn(new Date(2L));
+    when(meta.getETag()).thenReturn("mock-etag");
+    when(s3.getObjectMetadata(any())).thenReturn(meta);
+
+    // Create mock S3ObjectInputStream and S3Object for open()
+    S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class);
+    when(objectStream.read()).thenReturn(-1);
+
+    S3Object s3Object = mock(S3Object.class);
+    when(s3Object.getObjectContent()).thenReturn(objectStream);
+    when(s3Object.getObjectMetadata()).thenReturn(meta);
+    when(s3.getObject(any())).thenReturn(s3Object);
+
+    // Call read and then unbuffer
+    FSDataInputStream stream = fs.open(path);
+    assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes
+    stream.unbuffer();
+
+    // Verify that unbuffer closed the object stream
+    verify(objectStream, times(1)).close();
+  }
+}

+ 5 - 0
hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml

@@ -122,4 +122,9 @@
     <value>false</value>
   </property>
 
+  <property>
+    <name>fs.contract.supports-unbuffer</name>
+    <value>true</value>
+  </property>
+
 </configuration>