浏览代码

HADOOP-18781. ABFS backReference passed down to streams to avoid GC closing the FS. (#5830)

To avoid the ABFS instance getting closed due to GC while the streams are working, attach the ABFS instance to a backReference opaque object and passing down to the streams so that we have a hard reference while the streams are working.

Contributed by: Mehakmeet Singh
Mehakmeet Singh 1 年之前
父节点
当前提交
e4b39b9cb0

+ 48 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BackReference.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import javax.annotation.Nullable;
+
+/**
+ * Holds reference to an object to be attached to a stream or store to avoid
+ * the reference being lost to GC.
+ */
+public class BackReference {
+  private final Object reference;
+
+  public BackReference(@Nullable Object reference) {
+    this.reference = reference;
+  }
+
+  /**
+   * is the reference null?
+   * @return true if the ref. is null, else false.
+   */
+  public boolean isNull() {
+    return reference == null;
+  }
+
+  @Override
+  public String toString() {
+    return "BackReference{" +
+        "reference=" + reference +
+        '}';
+  }
+}

+ 7 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
@@ -155,6 +156,9 @@ public class AzureBlobFileSystem extends FileSystem
   /** Rate limiting for operations which use it to throttle their IO. */
   private RateLimiting rateLimiting;
 
+  /** Storing full path uri for better logging. */
+  private URI fullPathUri;
+
   @Override
   public void initialize(URI uri, Configuration configuration)
       throws IOException {
@@ -165,7 +169,7 @@ public class AzureBlobFileSystem extends FileSystem
     setConf(configuration);
 
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
-
+    this.fullPathUri = uri;
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     abfsCounters = new AbfsCountersImpl(uri);
     // name of the blockFactory to be used.
@@ -192,6 +196,7 @@ public class AzureBlobFileSystem extends FileSystem
             .withAbfsCounters(abfsCounters)
             .withBlockFactory(blockFactory)
             .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
+            .withBackReference(new BackReference(this))
             .build();
 
     this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
@@ -236,7 +241,7 @@ public class AzureBlobFileSystem extends FileSystem
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "AzureBlobFileSystem{");
-    sb.append("uri=").append(uri);
+    sb.append("uri=").append(fullPathUri);
     sb.append(", user='").append(abfsStore.getUser()).append('\'');
     sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
     sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");

+ 14 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -56,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -189,6 +190,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
   /** Bounded ThreadPool for this instance. */
   private ExecutorService boundedThreadPool;
 
+  /** ABFS instance reference to be held by the store to avoid GC close. */
+  private BackReference fsBackRef;
+
   /**
    * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
    * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
@@ -202,6 +206,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
+    this.fsBackRef = abfsStoreBuilder.fsBackRef;
 
     leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
 
@@ -711,6 +716,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
                 blockOutputActiveBlocks, true))
             .withTracingContext(tracingContext)
+            .withAbfsBackRef(fsBackRef)
             .build();
   }
 
@@ -818,6 +824,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
                 abfsConfiguration.shouldReadBufferSizeAlways())
             .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
             .withBufferedPreadDisabled(bufferedPreadDisabled)
+            .withAbfsBackRef(fsBackRef)
             .build();
   }
 
@@ -1871,6 +1878,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     private AbfsCounters abfsCounters;
     private DataBlocks.BlockFactory blockFactory;
     private int blockOutputActiveBlocks;
+    private BackReference fsBackRef;
 
     public AzureBlobFileSystemStoreBuilder withUri(URI value) {
       this.uri = value;
@@ -1906,6 +1914,12 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       return this;
     }
 
+    public AzureBlobFileSystemStoreBuilder withBackReference(
+        BackReference fsBackRef) {
+      this.fsBackRef = fsBackRef;
+      return this;
+    }
+
     public AzureBlobFileSystemStoreBuilder build() {
       return this;
     }

+ 10 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -27,6 +27,7 @@ import java.util.UUID;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.impl.BackReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,6 +122,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
    */
   private long nextReadPos;
 
+  /** ABFS instance to be held by the input stream to avoid GC close. */
+  private final BackReference fsBackRef;
+
   public AbfsInputStream(
           final AbfsClient client,
           final Statistics statistics,
@@ -152,6 +156,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.tracingContext.setStreamID(inputStreamId);
     this.context = abfsInputStreamContext;
     readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
+    this.fsBackRef = abfsInputStreamContext.getFsBackRef();
 
     // Propagate the config values to ReadBufferManager so that the first instance
     // to initialize can set the readAheadBlockSize
@@ -857,4 +862,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   long getLimit() {
     return this.limit;
   }
+
+  @VisibleForTesting
+  BackReference getFsBackRef() {
+    return fsBackRef;
+  }
 }

+ 14 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 
+import org.apache.hadoop.fs.impl.BackReference;
 /**
  * Class to hold extra input stream configs.
  */
@@ -51,6 +52,9 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
   private boolean bufferedPreadDisabled;
 
+  /** A BackReference to the FS instance that created this OutputStream. */
+  private BackReference fsBackRef;
+
   public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -122,6 +126,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsInputStreamContext withAbfsBackRef(
+      final BackReference fsBackRef) {
+    this.fsBackRef = fsBackRef;
+    return this;
+  }
+
   public AbfsInputStreamContext build() {
     if (readBufferSize > readAheadBlockSize) {
       LOG.debug(
@@ -180,4 +190,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
   public boolean isBufferedPreadDisabled() {
     return bufferedPreadDisabled;
   }
+
+  public BackReference getFsBackRef() {
+    return fsBackRef;
+  }
 }

+ 21 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -28,6 +28,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
@@ -126,6 +127,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   /** Executor service to carry out the parallel upload requests. */
   private final ListeningExecutorService executorService;
 
+  /** ABFS instance to be held by the output stream to avoid GC close. */
+  private final BackReference fsBackRef;
+
   public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
       throws IOException {
     this.client = abfsOutputStreamContext.getClient();
@@ -147,6 +151,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     this.numOfAppendsToServerSinceLastFlush = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();
     this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
+    this.fsBackRef = abfsOutputStreamContext.getFsBackRef();
 
     if (this.isAppendBlob) {
       this.maxConcurrentRequestCount = 1;
@@ -488,6 +493,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     }
 
     try {
+      // Check if Executor Service got shutdown before the writes could be
+      // completed.
+      if (hasActiveBlockDataToUpload() && executorService.isShutdown()) {
+        throw new PathIOException(path, "Executor Service closed before "
+            + "writes could be completed.");
+      }
       flushInternal(true);
     } catch (IOException e) {
       // Problems surface in try-with-resources clauses if
@@ -766,4 +777,14 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     sb.append("}");
     return sb.toString();
   }
+
+  @VisibleForTesting
+  BackReference getFsBackRef() {
+    return fsBackRef;
+  }
+
+  @VisibleForTesting
+  ListeningExecutorService getExecutorService() {
+    return executorService;
+  }
 }

+ 14 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java

@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.fs.store.DataBlocks;
 
 /**
@@ -65,6 +66,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private TracingContext tracingContext;
 
+  /** A BackReference to the FS instance that created this OutputStream. */
+  private BackReference fsBackRef;
+
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -157,6 +161,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext withAbfsBackRef(
+      final BackReference fsBackRef) {
+    this.fsBackRef = fsBackRef;
+    return this;
+  }
+
   public AbfsOutputStreamContext build() {
     // Validation of parameters to be done here.
     if (streamStatistics == null) {
@@ -261,4 +271,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public TracingContext getTracingContext() {
     return tracingContext;
   }
+
+  public BackReference getFsBackRef() {
+    return fsBackRef;
+  }
 }

+ 21 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
@@ -106,6 +108,25 @@ public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
     }
   }
 
+  /**
+   * Testing the back reference being passed down to AbfsInputStream.
+   */
+  @Test
+  public void testAzureBlobFileSystemBackReferenceInInputStream()
+      throws IOException {
+    Path path = path(getMethodName());
+    // Create a file then open it to verify if this input stream contains any
+    // back reference.
+    try (FSDataOutputStream out = getFileSystem().create(path);
+        FSDataInputStream in = getFileSystem().open(path)) {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) in.getWrappedStream();
+
+      Assertions.assertThat(abfsInputStream.getFsBackRef().isNull())
+          .describedAs("BackReference in input stream should not be null")
+          .isFalse();
+    }
+  }
+
   private void testExceptionInOptimization(final FileSystem fs,
       final Path testFilePath,
       final int seekPos, final int length, final byte[] fileContent)

+ 78 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java

@@ -18,19 +18,28 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.test.LambdaTestUtils;
 
 /**
  * Test create operation.
  */
 public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_EXECUTION_TIMEOUT = 2 * 60 * 1000;
   private static final String TEST_FILE_PATH = "testfile";
 
   public ITestAbfsOutputStream() throws Exception {
@@ -84,4 +93,73 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
     }
   }
 
+  /**
+   * Verify the passing of AzureBlobFileSystem reference to AbfsOutputStream
+   * to make sure that the FS instance is not eligible for GC while writing.
+   */
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testAzureBlobFileSystemBackReferenceInOutputStream()
+      throws Exception {
+    byte[] testBytes = new byte[5 * 1024];
+    // Creating an output stream using a FS in a separate method to make the
+    // FS instance used eligible for GC. Since when a method is popped from
+    // the stack frame, it's variables become anonymous, this creates higher
+    // chance of getting Garbage collected.
+    try (AbfsOutputStream out = getStream()) {
+      // Every 5KB block written is flushed and a GC is hinted, if the
+      // executor service is shut down in between, the test should fail
+      // indicating premature shutdown while writing.
+      for (int i = 0; i < 5; i++) {
+        out.write(testBytes);
+        out.flush();
+        System.gc();
+        Assertions.assertThat(
+            out.getExecutorService().isShutdown() || out.getExecutorService()
+                .isTerminated())
+            .describedAs("Executor Service should not be closed before "
+                + "OutputStream while writing")
+            .isFalse();
+        Assertions.assertThat(out.getFsBackRef().isNull())
+            .describedAs("BackReference in output stream should not be null")
+            .isFalse();
+      }
+    }
+  }
+
+  /**
+   * Verify AbfsOutputStream close() behaviour of throwing a PathIOE when the
+   * FS instance is closed before the stream.
+   */
+  @Test
+  public void testAbfsOutputStreamClosingFsBeforeStream()
+      throws Exception {
+    AzureBlobFileSystem fs = new AzureBlobFileSystem();
+    fs.initialize(new URI(getTestUrl()), new Configuration());
+    Path pathFs = path(getMethodName());
+    byte[] inputBytes = new byte[5 * 1024];
+    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+        pathFs)) {
+      out.write(inputBytes);
+      fs.close();
+      // verify that output stream close after fs.close() would raise a
+      // pathIOE containing the path being written to.
+      LambdaTestUtils
+          .intercept(PathIOException.class, getMethodName(), out::close);
+    }
+  }
+
+  /**
+   * Separate method to create an outputStream using a local FS instance so
+   * that once this method has returned, the FS instance can be eligible for GC.
+   *
+   * @return AbfsOutputStream used for writing.
+   */
+  private AbfsOutputStream getStream() throws URISyntaxException, IOException {
+    AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
+    fs1.initialize(new URI(getTestUrl()), new Configuration());
+    Path pathFs1 = path(getMethodName() + "1");
+
+    return createAbfsOutputStreamWithFlushEnabled(fs1, pathFs1);
+  }
+
 }