فهرست منبع

HADOOP-19542. S3A: Close AAL factory on service stop. (#7616)

Contributed by: Ahmar Suhail.
ahmarsuhail 1 هفته پیش
والد
کامیت
ed7e7dabcc

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

@@ -104,6 +104,13 @@ public final class StreamStatisticNames {
    */
    */
   public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
   public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
 
 
+  /**
+   * Total count of times object stream factory was closed.
+   *
+   * Value: {@value}.
+   */
+  public static final String ANALYTICS_STREAM_FACTORY_CLOSED = "analytics_stream_factory_closed";
+
   /**
   /**
    * Count of exceptions raised during input stream reads.
    * Count of exceptions raised during input stream reads.
    * Value: {@value}.
    * Value: {@value}.

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -352,6 +352,10 @@ public enum Statistic {
       StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
       StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
       "Total count of times an attempt to close an input stream was made",
       "Total count of times an attempt to close an input stream was made",
       TYPE_COUNTER),
       TYPE_COUNTER),
+  ANALYTICS_STREAM_FACTORY_CLOSED(
+          "analytics_stream_factory_closed",
+          "Count of times the analytics stream factory was closed",
+          TYPE_COUNTER),
   STREAM_READ_EXCEPTIONS(
   STREAM_READ_EXCEPTIONS(
       StreamStatisticNames.STREAM_READ_EXCEPTIONS,
       StreamStatisticNames.STREAM_READ_EXCEPTIONS,
       "Count of exceptions raised during input stream reads",
       "Count of exceptions raised during input stream reads",

+ 6 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

@@ -996,6 +996,12 @@ public class S3AStoreImpl
       LOG.debug("Stream factory requested async client");
       LOG.debug("Stream factory requested async client");
       return clientManager().getOrCreateAsyncClient();
       return clientManager().getOrCreateAsyncClient();
     }
     }
+
+    @Override
+    public void incrementFactoryStatistic(Statistic statistic) {
+      incrementStatistic(statistic);
+    }
+
   }
   }
 
 
   /*
   /*

+ 8 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE;
 import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
 import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
 
 
 import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
 import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.ANALYTICS_STREAM_FACTORY_CLOSED;
 import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
 import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
 
 
 /**
 /**
@@ -95,6 +96,13 @@ public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
             StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
             StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
   }
   }
 
 
+  @Override
+  protected void serviceStop() throws Exception {
+    this.s3SeekableInputStreamFactory.close();
+    callbacks().incrementFactoryStatistic(ANALYTICS_STREAM_FACTORY_CLOSED);
+    super.serviceStop();
+  }
+
   private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
   private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
         throws IOException {
         throws IOException {
     return s3SeekableInputStreamFactory.eval();
     return s3SeekableInputStreamFactory.eval();

+ 3 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 
 
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 
 
+import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.Service;
 
 
@@ -85,6 +86,8 @@ public interface ObjectInputStreamFactory
      * @throws IOException failure to create the client.
      * @throws IOException failure to create the client.
      */
      */
     S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
     S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws IOException;
+
+    void incrementFactoryStatistic(Statistic statistic);
   }
   }
 }
 }
 
 

+ 3 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

@@ -48,6 +48,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides
 import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
 import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 
 /**
 /**
@@ -106,6 +107,8 @@ public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBa
     }
     }
 
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    fs.close();
+    verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);
   }
   }
 
 
   @Test
   @Test

+ 6 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.impl.streams;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.io.UncheckedIOException;
 
 
+import org.apache.hadoop.fs.s3a.Statistic;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.Test;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -334,6 +335,11 @@ public class TestStreamFactories extends AbstractHadoopTestBase {
     public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
     public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) throws IOException {
       throw new UnsupportedOperationException("not implemented");
       throw new UnsupportedOperationException("not implemented");
     }
     }
+
+    @Override
+    public void incrementFactoryStatistic(Statistic statistic) {
+      throw new UnsupportedOperationException("not implemented");
+    }
   }
   }
 
 
 }
 }