Procházet zdrojové kódy

HADOOP-16965. Refactor abfs stream configuration. (#1956)

Contributed by Mukund Thakur.
Mukund Thakur před 5 roky
rodič
revize
98fdbb820e

+ 22 - 8
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -81,7 +81,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
@@ -415,12 +417,18 @@ public class AzureBlobFileSystemStore implements Closeable {
           statistics,
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           0,
-          abfsConfiguration.getWriteBufferSize(),
-          abfsConfiguration.isFlushEnabled(),
-          abfsConfiguration.isOutputStreamFlushDisabled());
+          populateAbfsOutputStreamContext());
     }
   }
 
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
+    return new AbfsOutputStreamContext()
+            .withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
+            .enableFlush(abfsConfiguration.isFlushEnabled())
+            .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
+            .build();
+  }
+
   public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
       throws AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
@@ -466,11 +474,19 @@ public class AzureBlobFileSystemStore implements Closeable {
       // Add statistics for InputStream
       return new AbfsInputStream(client, statistics,
               AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
-              abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
-              abfsConfiguration.getTolerateOobAppends(), eTag);
+              populateAbfsInputStreamContext(),
+              eTag);
     }
   }
 
+  private AbfsInputStreamContext populateAbfsInputStreamContext() {
+    return new AbfsInputStreamContext()
+            .withReadBufferSize(abfsConfiguration.getReadBufferSize())
+            .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
+            .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            .build();
+  }
+
   public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
           AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
@@ -502,9 +518,7 @@ public class AzureBlobFileSystemStore implements Closeable {
           statistics,
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           offset,
-          abfsConfiguration.getWriteBufferSize(),
-          abfsConfiguration.isFlushEnabled(),
-          abfsConfiguration.isOutputStreamFlushDisabled());
+          populateAbfsOutputStreamContext());
     }
   }
 

+ 9 - 11
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -61,21 +61,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private boolean closed = false;
 
   public AbfsInputStream(
-      final AbfsClient client,
-      final Statistics statistics,
-      final String path,
-      final long contentLength,
-      final int bufferSize,
-      final int readAheadQueueDepth,
-      final boolean tolerateOobAppends,
-      final String eTag) {
+          final AbfsClient client,
+          final Statistics statistics,
+          final String path,
+          final long contentLength,
+          final AbfsInputStreamContext abfsInputStreamContext,
+          final String eTag) {
     this.client = client;
     this.statistics = statistics;
     this.path = path;
     this.contentLength = contentLength;
-    this.bufferSize = bufferSize;
-    this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
-    this.tolerateOobAppends = tolerateOobAppends;
+    this.bufferSize = abfsInputStreamContext.getReadBufferSize();
+    this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
+    this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadEnabled = true;
   }

+ 70 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Class to hold extra input stream configs.
+ */
+public class AbfsInputStreamContext extends AbfsStreamContext {
+
+  private int readBufferSize;
+
+  private int readAheadQueueDepth;
+
+  private boolean tolerateOobAppends;
+
+  public AbfsInputStreamContext() {
+  }
+
+  public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
+    this.readBufferSize = readBufferSize;
+    return this;
+  }
+
+  public AbfsInputStreamContext withReadAheadQueueDepth(
+          final int readAheadQueueDepth) {
+    this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
+            ? readAheadQueueDepth
+            : Runtime.getRuntime().availableProcessors();
+    return this;
+  }
+
+  public AbfsInputStreamContext withTolerateOobAppends(
+          final boolean tolerateOobAppends) {
+    this.tolerateOobAppends = tolerateOobAppends;
+    return this;
+  }
+
+  public AbfsInputStreamContext build() {
+    // Validation of parameters to be done here.
+    return this;
+  }
+
+  public int getReadBufferSize() {
+    return readBufferSize;
+  }
+
+  public int getReadAheadQueueDepth() {
+    return readAheadQueueDepth;
+  }
+
+  public boolean isTolerateOobAppends() {
+    return tolerateOobAppends;
+  }
+}

+ 9 - 10
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final Statistics statistics;
 
   public AbfsOutputStream(
-      final AbfsClient client,
-      final Statistics statistics,
-      final String path,
-      final long position,
-      final int bufferSize,
-      final boolean supportFlush,
-      final boolean disableOutputStreamFlush) {
+          final AbfsClient client,
+          final Statistics statistics,
+          final String path,
+          final long position,
+          AbfsOutputStreamContext abfsOutputStreamContext) {
     this.client = client;
     this.statistics = statistics;
     this.path = path;
     this.position = position;
     this.closed = false;
-    this.supportFlush = supportFlush;
-    this.disableOutputStreamFlush = disableOutputStreamFlush;
+    this.supportFlush = abfsOutputStreamContext.isEnableFlush();
+    this.disableOutputStreamFlush = abfsOutputStreamContext
+            .isDisableOutputStreamFlush();
     this.lastError = null;
     this.lastFlushOffset = 0;
-    this.bufferSize = bufferSize;
+    this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
     this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();

+ 68 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Class to hold extra output stream configs.
+ */
+public class AbfsOutputStreamContext extends AbfsStreamContext {
+
+  private int writeBufferSize;
+
+  private boolean enableFlush;
+
+  private boolean disableOutputStreamFlush;
+
+  public AbfsOutputStreamContext() {
+  }
+
+  public AbfsOutputStreamContext withWriteBufferSize(
+          final int writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
+    this.enableFlush = enableFlush;
+    return this;
+  }
+
+  public AbfsOutputStreamContext disableOutputStreamFlush(
+          final boolean disableOutputStreamFlush) {
+    this.disableOutputStreamFlush = disableOutputStreamFlush;
+    return this;
+  }
+
+  public AbfsOutputStreamContext build() {
+    // Validation of parameters to be done here.
+    return this;
+  }
+
+  public int getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  public boolean isEnableFlush() {
+    return enableFlush;
+  }
+
+  public boolean isDisableOutputStreamFlush() {
+    return disableOutputStreamFlush;
+  }
+}

+ 26 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java

@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Base stream configuration class which is going
+ * to store common configs among input and output streams.
+ */
+public abstract class AbfsStreamContext {
+}