Procházet zdrojové kódy

Revert ""HADOOP-16910. ABFS Streams to update FileSystem.Statistics counters on IO."

This reverts commit e2c7ac71b5ee47bb40294acd10c0c21dd6ee430f.

Change-Id: I5b5a93f5a36cdb0c3d56d1b3f747c318f089de20
Steve Loughran před 5 roky
rodič
revize
28afdce009

+ 2 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -188,7 +188,7 @@ public class AzureBlobFileSystem extends FileSystem {
     Path qualifiedPath = makeQualified(f);
     Path qualifiedPath = makeQualified(f);
 
 
     try {
     try {
-      OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
+      OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
           permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
           permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
       return new FSDataOutputStream(outputStream, statistics);
       return new FSDataOutputStream(outputStream, statistics);
     } catch(AzureBlobFileSystemException ex) {
     } catch(AzureBlobFileSystemException ex) {
@@ -250,7 +250,7 @@ public class AzureBlobFileSystem extends FileSystem {
     Path qualifiedPath = makeQualified(f);
     Path qualifiedPath = makeQualified(f);
 
 
     try {
     try {
-      OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false);
+      OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
       return new FSDataOutputStream(outputStream, statistics);
       return new FSDataOutputStream(outputStream, statistics);
     } catch(AzureBlobFileSystemException ex) {
     } catch(AzureBlobFileSystemException ex) {
       checkException(f, ex);
       checkException(f, ex);

+ 3 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -412,10 +412,8 @@ public class AzureBlobFileSystemStore implements Closeable {
     }
     }
   }
   }
 
 
-  public OutputStream createFile(final Path path,
-      final FileSystem.Statistics statistics,
-      final boolean overwrite, final FsPermission permission,
-      final FsPermission umask) throws AzureBlobFileSystemException {
+  public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
+                                 final FsPermission umask) throws AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
     try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
       boolean isNamespaceEnabled = getIsNamespaceEnabled();
       boolean isNamespaceEnabled = getIsNamespaceEnabled();
       LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
       LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -438,7 +436,6 @@ public class AzureBlobFileSystemStore implements Closeable {
 
 
       return new AbfsOutputStream(
       return new AbfsOutputStream(
           client,
           client,
-          statistics,
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           0,
           0,
           abfsConfiguration.getWriteBufferSize(),
           abfsConfiguration.getWriteBufferSize(),
@@ -499,7 +496,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     }
     }
   }
   }
 
 
-  public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
+  public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
           AzureBlobFileSystemException {
           AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
       LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
       LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
@@ -532,7 +529,6 @@ public class AzureBlobFileSystemStore implements Closeable {
 
 
       return new AbfsOutputStream(
       return new AbfsOutputStream(
           client,
           client,
-          statistics,
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           offset,
           offset,
           abfsConfiguration.getWriteBufferSize(),
           abfsConfiguration.getWriteBufferSize(),

+ 0 - 12
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -101,7 +101,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     int currentLen = len;
     int currentLen = len;
     int lastReadBytes;
     int lastReadBytes;
     int totalReadBytes = 0;
     int totalReadBytes = 0;
-    incrementReadOps();
     do {
     do {
       lastReadBytes = readOneBlock(b, currentOff, currentLen);
       lastReadBytes = readOneBlock(b, currentOff, currentLen);
       if (lastReadBytes > 0) {
       if (lastReadBytes > 0) {
@@ -202,7 +201,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       // try reading from buffers first
       // try reading from buffers first
       receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
       receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
       if (receivedBytes > 0) {
       if (receivedBytes > 0) {
-        incrementReadOps();
         return receivedBytes;
         return receivedBytes;
       }
       }
 
 
@@ -238,7 +236,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
       op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
       op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
-      incrementReadOps();
     } catch (AzureBlobFileSystemException ex) {
     } catch (AzureBlobFileSystemException ex) {
       if (ex instanceof AbfsRestOperationException) {
       if (ex instanceof AbfsRestOperationException) {
         AbfsRestOperationException ere = (AbfsRestOperationException) ex;
         AbfsRestOperationException ere = (AbfsRestOperationException) ex;
@@ -255,15 +252,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return (int) bytesRead;
     return (int) bytesRead;
   }
   }
 
 
-  /**
-   * Increment Read Operations.
-   */
-  private void incrementReadOps() {
-    if (statistics != null) {
-      statistics.incrementReadOps(1);
-    }
-  }
-
   /**
   /**
    * Seek to given position in stream.
    * Seek to given position in stream.
    * @param n position to seek to
    * @param n position to seek to

+ 0 - 15
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -39,7 +39,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
-import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.Syncable;
@@ -81,11 +80,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final ElasticByteBufferPool byteBufferPool
   private final ElasticByteBufferPool byteBufferPool
           = new ElasticByteBufferPool();
           = new ElasticByteBufferPool();
 
 
-  private final Statistics statistics;
-
   public AbfsOutputStream(
   public AbfsOutputStream(
       final AbfsClient client,
       final AbfsClient client,
-      final Statistics statistics,
       final String path,
       final String path,
       final long position,
       final long position,
       final int bufferSize,
       final int bufferSize,
@@ -94,7 +90,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       final boolean supportAppendWithFlush,
       final boolean supportAppendWithFlush,
       final boolean appendBlob) {
       final boolean appendBlob) {
     this.client = client;
     this.client = client;
-    this.statistics = statistics;
     this.path = path;
     this.path = path;
     this.position = position;
     this.position = position;
     this.closed = false;
     this.closed = false;
@@ -192,16 +187,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
 
 
       writableBytes = bufferSize - bufferIndex;
       writableBytes = bufferSize - bufferIndex;
     }
     }
-    incrementWriteOps();
-  }
-
-  /**
-   * Increment Write Operations.
-   */
-  private void incrementWriteOps() {
-    if (statistics != null) {
-      statistics.incrementWriteOps(1);
-    }
   }
   }
 
 
   /**
   /**

+ 0 - 59
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java

@@ -17,19 +17,12 @@
  */
  */
 package org.apache.hadoop.fs.azurebfs;
 package org.apache.hadoop.fs.azurebfs;
 
 
-import java.io.IOException;
-
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Rule;
 import org.junit.rules.TestName;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
 
 
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;
 
 
@@ -38,9 +31,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST
  * This class does not attempt to bind to Azure.
  * This class does not attempt to bind to Azure.
  */
  */
 public class AbstractAbfsTestWithTimeout extends Assert {
 public class AbstractAbfsTestWithTimeout extends Assert {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class);
-
   /**
   /**
    * The name of the current method.
    * The name of the current method.
    */
    */
@@ -77,53 +67,4 @@ public class AbstractAbfsTestWithTimeout extends Assert {
   protected int getTestTimeoutMillis() {
   protected int getTestTimeoutMillis() {
     return TEST_TIMEOUT;
     return TEST_TIMEOUT;
   }
   }
-
-  /**
-   * Describe a test in the logs.
-   *
-   * @param text text to print
-   * @param args arguments to format in the printing
-   */
-  protected void describe(String text, Object... args) {
-    LOG.info("\n\n{}: {}\n",
-        methodName.getMethodName(),
-        String.format(text, args));
-  }
-
-  /**
-   * Validate Contents written on a file in Abfs.
-   *
-   * @param fs                AzureBlobFileSystem
-   * @param path              Path of the file
-   * @param originalByteArray original byte array
-   * @return if content is validated true else, false
-   * @throws IOException
-   */
-  protected boolean validateContent(AzureBlobFileSystem fs, Path path,
-      byte[] originalByteArray)
-      throws IOException {
-    int pos = 0;
-    int lenOfOriginalByteArray = originalByteArray.length;
-
-    try (FSDataInputStream in = fs.open(path)) {
-      byte valueOfContentAtPos = (byte) in.read();
-
-      while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) {
-        if (originalByteArray[pos] != valueOfContentAtPos) {
-          assertEquals("Mismatch in content validation at position {}", pos,
-              originalByteArray[pos], valueOfContentAtPos);
-          return false;
-        }
-        valueOfContentAtPos = (byte) in.read();
-        pos++;
-      }
-      if (valueOfContentAtPos != -1) {
-        assertEquals("Expected end of file", -1, valueOfContentAtPos);
-        return false;
-      }
-      return true;
-    }
-
-  }
-
 }
 }

+ 0 - 157
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java

@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-
-/**
- * Test Abfs Stream.
- */
-
-public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
-  public ITestAbfsStreamStatistics() throws Exception {
-  }
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
-
-  private static int LARGE_NUMBER_OF_OPS = 1000000;
-
-  /***
-   * Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and
-   * {@code incrementWriteOps()} in class {@code AbfsOutputStream}.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testAbfsStreamOps() throws Exception {
-    describe("Test to see correct population of read and write operations in "
-        + "Abfs");
-
-    final AzureBlobFileSystem fs = getFileSystem();
-    Path smallOperationsFile = new Path("testOneReadWriteOps");
-    Path largeOperationsFile = new Path("testLargeReadWriteOps");
-    FileSystem.Statistics statistics = fs.getFsStatistics();
-    String testReadWriteOps = "test this";
-    statistics.reset();
-
-    //Test for zero write operation
-    assertReadWriteOps("write", 0, statistics.getWriteOps());
-
-    //Test for zero read operation
-    assertReadWriteOps("read", 0, statistics.getReadOps());
-
-    FSDataOutputStream outForOneOperation = null;
-    FSDataInputStream inForOneOperation = null;
-    try {
-      outForOneOperation = fs.create(smallOperationsFile);
-      statistics.reset();
-      outForOneOperation.write(testReadWriteOps.getBytes());
-
-      //Test for a single write operation
-      assertReadWriteOps("write", 1, statistics.getWriteOps());
-
-      //Flushing output stream to see content to read
-      outForOneOperation.hflush();
-      inForOneOperation = fs.open(smallOperationsFile);
-      statistics.reset();
-      int result = inForOneOperation.read(testReadWriteOps.getBytes(), 0,
-          testReadWriteOps.getBytes().length);
-
-      LOG.info("Result of Read operation : {}", result);
-      /*
-      Testing if 2 read_ops value is coming after reading full content from a
-      file (3 if anything to read from Buffer too).
-      Reason: read() call gives read_ops=1,
-      reading from AbfsClient(http GET) gives read_ops=2.
-       */
-      assertReadWriteOps("read", 2, statistics.getReadOps());
-
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, inForOneOperation,
-          outForOneOperation);
-    }
-
-    //Validating if content is being written in the smallOperationsFile
-    assertTrue("Mismatch in content validation",
-        validateContent(fs, smallOperationsFile,
-            testReadWriteOps.getBytes()));
-
-    FSDataOutputStream outForLargeOperations = null;
-    FSDataInputStream inForLargeOperations = null;
-    StringBuilder largeOperationsValidationString = new StringBuilder();
-    try {
-      outForLargeOperations = fs.create(largeOperationsFile);
-      statistics.reset();
-      int largeValue = LARGE_NUMBER_OF_OPS;
-      for (int i = 0; i < largeValue; i++) {
-        outForLargeOperations.write(testReadWriteOps.getBytes());
-
-        //Creating the String for content Validation
-        largeOperationsValidationString.append(testReadWriteOps);
-      }
-      LOG.info("Number of bytes of Large data written: {}",
-          largeOperationsValidationString.toString().getBytes().length);
-
-      //Test for 1000000 write operations
-      assertReadWriteOps("write", largeValue, statistics.getWriteOps());
-
-      inForLargeOperations = fs.open(largeOperationsFile);
-      for (int i = 0; i < largeValue; i++) {
-        inForLargeOperations
-            .read(testReadWriteOps.getBytes(), 0,
-                testReadWriteOps.getBytes().length);
-      }
-
-      //Test for 1000000 read operations
-      assertReadWriteOps("read", largeValue, statistics.getReadOps());
-
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, inForLargeOperations,
-          outForLargeOperations);
-    }
-    //Validating if content is being written in largeOperationsFile
-    assertTrue("Mismatch in content validation",
-        validateContent(fs, largeOperationsFile,
-            largeOperationsValidationString.toString().getBytes()));
-
-  }
-
-  /**
-   * Generic method to assert both Read an write operations.
-   *
-   * @param operation     what operation is being asserted
-   * @param expectedValue value which is expected
-   * @param actualValue   value which is actual
-   */
-
-  private void assertReadWriteOps(String operation, long expectedValue,
-      long actualValue) {
-    assertEquals("Mismatch in " + operation + " operations", expectedValue,
-        actualValue);
-  }
-}

+ 2 - 9
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java

@@ -22,21 +22,18 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.util.Map;
 import java.util.Map;
 
 
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
 import org.junit.Assume;
 import org.junit.Assume;
 import org.junit.Test;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
-import org.apache.hadoop.io.IOUtils;
 
 
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
@@ -55,8 +52,6 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
   private static final Path FILE_PATH = new Path("/testFile");
   private static final Path FILE_PATH = new Path("/testFile");
   private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
   private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
   private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
   private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
 
 
   public ITestAzureBlobFileSystemOauth() throws Exception {
   public ITestAzureBlobFileSystemOauth() throws Exception {
     Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
     Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
@@ -148,11 +143,9 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
 
 
     // TEST WRITE FILE
     // TEST WRITE FILE
     try {
     try {
-      abfsStore.openFileForWrite(EXISTED_FILE_PATH, fs.getFsStatistics(), true);
+      abfsStore.openFileForWrite(EXISTED_FILE_PATH, true);
     } catch (AbfsRestOperationException e) {
     } catch (AbfsRestOperationException e) {
       assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
       assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, abfsStore);
     }
     }
 
 
   }
   }