Browse Source

HADOOP-15917. AliyunOSS: fix incorrect ReadOps and WriteOps in statistics. Contributed by Jinhu Wu.

(cherry picked from commit 3fade865ce84dcf68bcd7de5a5ed1c7d904796e9)
(cherry picked from commit 64cb97fb4467513f73fde18f96f391ad34e3bb0a)
(cherry picked from commit 5d532cfc6f23f942ed10edab55ed251eb99a0664)
(cherry picked from commit 37082a664aaf99bc40522a8dfa231d71792dd976)
Sammi Chen 6 năm trước cách đây
mục cha
commit
3aac324a07

+ 0 - 4
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

@@ -457,7 +457,6 @@ public class AliyunOSSFileSystem extends FileSystem {
 
       ObjectListing objects = store.listObjects(key, maxKeys, null, false);
       while (true) {
-        statistics.incrementReadOps(1);
         for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
           String objKey = objectSummary.getKey();
           if (objKey.equals(key + "/")) {
@@ -498,7 +497,6 @@ public class AliyunOSSFileSystem extends FileSystem {
           }
           String nextMarker = objects.getNextMarker();
           objects = store.listObjects(key, maxKeys, nextMarker, false);
-          statistics.incrementReadOps(1);
         } else {
           break;
         }
@@ -694,7 +692,6 @@ public class AliyunOSSFileSystem extends FileSystem {
         new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
             maxConcurrentCopyTasksPerDir, true));
     ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
-    statistics.incrementReadOps(1);
     // Copy files from src folder to dst
     int copiesToFinish = 0;
     while (true) {
@@ -717,7 +714,6 @@ public class AliyunOSSFileSystem extends FileSystem {
       if (objects.isTruncated()) {
         String nextMarker = objects.getNextMarker();
         objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
-        statistics.incrementReadOps(1);
       } else {
         break;
       }

+ 18 - 4
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -175,6 +175,7 @@ public class AliyunOSSFileSystemStore {
       CannedAccessControlList cannedACL =
           CannedAccessControlList.valueOf(cannedACLName);
       ossClient.setBucketAcl(bucketName, cannedACL);
+      statistics.incrementWriteOps(1);
     }
 
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
@@ -216,6 +217,7 @@ public class AliyunOSSFileSystemStore {
       // Here, we choose the simple mode to do batch delete.
       deleteRequest.setQuiet(true);
       DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
+      statistics.incrementWriteOps(1);
       deleteFailed = result.getDeletedObjects();
       tries++;
       if (tries == retry) {
@@ -268,11 +270,13 @@ public class AliyunOSSFileSystemStore {
    */
   public ObjectMetadata getObjectMetadata(String key) {
     try {
-      return ossClient.getObjectMetadata(bucketName, key);
+      ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key);
+      statistics.incrementReadOps(1);
+      return objectMeta;
     } catch (OSSException osse) {
+      LOG.error("Exception thrown when get object meta: "
+              + key + ", exception: " + osse);
       return null;
-    } finally {
-      statistics.incrementReadOps(1);
     }
   }
 
@@ -289,6 +293,7 @@ public class AliyunOSSFileSystemStore {
     dirMeta.setContentLength(0);
     try {
       ossClient.putObject(bucketName, key, in, dirMeta);
+      statistics.incrementWriteOps(1);
     } finally {
       in.close();
     }
@@ -304,6 +309,7 @@ public class AliyunOSSFileSystemStore {
   public boolean copyFile(String srcKey, String dstKey) {
     ObjectMetadata objectMeta =
         ossClient.getObjectMetadata(bucketName, srcKey);
+    statistics.incrementReadOps(1);
     long contentLength = objectMeta.getContentLength();
     if (contentLength <= multipartThreshold) {
       return singleCopy(srcKey, dstKey);
@@ -323,6 +329,7 @@ public class AliyunOSSFileSystemStore {
   private boolean singleCopy(String srcKey, String dstKey) {
     CopyObjectResult copyResult =
         ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
+    statistics.incrementWriteOps(1);
     LOG.debug(copyResult.getETag());
     return true;
   }
@@ -372,6 +379,7 @@ public class AliyunOSSFileSystemStore {
         UploadPartCopyResult partCopyResult =
             ossClient.uploadPartCopy(partCopyRequest);
         statistics.incrementWriteOps(1);
+        statistics.incrementBytesWritten(size);
         partETags.add(partCopyResult.getPartETag());
       }
       CompleteMultipartUploadRequest completeMultipartUploadRequest =
@@ -408,6 +416,7 @@ public class AliyunOSSFileSystemStore {
       PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
       LOG.debug(result.getETag());
       statistics.incrementWriteOps(1);
+      statistics.incrementBytesWritten(file.length());
     } finally {
       fis.close();
     }
@@ -449,7 +458,9 @@ public class AliyunOSSFileSystemStore {
     try {
       GetObjectRequest request = new GetObjectRequest(bucketName, key);
       request.setRange(byteStart, byteEnd);
-      return ossClient.getObject(request).getObjectContent();
+      InputStream in = ossClient.getObject(request).getObjectContent();
+      statistics.incrementReadOps(1);
+      return in;
     } catch (OSSException | ClientException e) {
       LOG.error("Exception thrown when store retrieves key: "
               + key + ", exception: " + e);
@@ -480,6 +491,7 @@ public class AliyunOSSFileSystemStore {
       for (OSSObjectSummary object : objects.getObjectSummaries()) {
         key = object.getKey();
         ossClient.deleteObject(bucketName, key);
+        statistics.incrementWriteOps(1);
       }
 
       for (String dir: objects.getCommonPrefixes()) {
@@ -605,6 +617,8 @@ public class AliyunOSSFileSystemStore {
         uploadRequest.setPartSize(file.length());
         uploadRequest.setPartNumber(idx);
         UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
+        statistics.incrementWriteOps(1);
+        statistics.incrementBytesWritten(file.length());
         return uploadResult.getPartETag();
       } catch (Exception e) {
         LOG.debug("Failed to upload "+ file.getPath() +", " +

+ 5 - 0
hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md

@@ -117,6 +117,11 @@ please raise your issues with them.
        </description>
     </property>
 
+    <property>
+    <name>fs.oss.impl</name>
+    <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
+    </property>
+
     <property>
       <name>fs.oss.assumed.role.arn</name>
       <description>

+ 60 - 10
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java

@@ -32,6 +32,7 @@ import java.io.IOException;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests regular and multi-part upload functionality for
@@ -74,24 +75,73 @@ public class TestAliyunOSSBlockOutputStream {
 
   @Test
   public void testRegularUpload() throws IOException {
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1);
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1);
+    long size = 1024 * 1024;
+    FileSystem.Statistics statistics =
+        FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
+    // This test is a little complicated for statistics, lifecycle is
+    // generateTestFile
+    //   fs.create(getFileStatus)    read 1
+    //   output stream write         write 1
+    // path exists(fs.exists)        read 1
+    // verifyReceivedData
+    //   fs.open(getFileStatus)      read 1
+    //   input stream read           read 2(part size is 512K)
+    // fs.delete
+    //   getFileStatus & delete & exists & create fake dir read 2, write 2
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
+    assertEquals(7, statistics.getReadOps());
+    assertEquals(size - 1, statistics.getBytesRead());
+    assertEquals(3, statistics.getWriteOps());
+    assertEquals(size - 1, statistics.getBytesWritten());
+
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+    assertEquals(14, statistics.getReadOps());
+    assertEquals(2 * size - 1, statistics.getBytesRead());
+    assertEquals(6, statistics.getWriteOps());
+    assertEquals(2 * size - 1, statistics.getBytesWritten());
+
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
+
+    assertEquals(22, statistics.getReadOps());
+    assertEquals(3 * size, statistics.getBytesRead());
+    assertEquals(10, statistics.getWriteOps());
+    assertEquals(3 * size, statistics.getBytesWritten());
   }
 
   @Test
   public void testMultiPartUpload() throws IOException {
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
-        6 * 1024 * 1024 - 1);
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
-        6 * 1024 * 1024 + 1);
+    long size = 6 * 1024 * 1024;
+    FileSystem.Statistics statistics =
+        FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
+    assertEquals(17, statistics.getReadOps());
+    assertEquals(size - 1, statistics.getBytesRead());
+    assertEquals(8, statistics.getWriteOps());
+    assertEquals(size - 1, statistics.getBytesWritten());
+
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+    assertEquals(34, statistics.getReadOps());
+    assertEquals(2 * size - 1, statistics.getBytesRead());
+    assertEquals(16, statistics.getWriteOps());
+    assertEquals(2 * size - 1, statistics.getBytesWritten());
+
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
+    assertEquals(52, statistics.getReadOps());
+    assertEquals(3 * size, statistics.getBytesRead());
+    assertEquals(25, statistics.getWriteOps());
+    assertEquals(3 * size, statistics.getBytesWritten());
   }
 
   @Test
   public void testMultiPartUploadConcurrent() throws IOException {
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
-        50 * 1024 * 1024 - 1);
+    long size = 50 * 1024 * 1024 - 1;
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+    FileSystem.Statistics statistics =
+        FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
+    assertEquals(105, statistics.getReadOps());
+    assertEquals(size, statistics.getBytesRead());
+    assertEquals(52, statistics.getWriteOps());
+    assertEquals(size, statistics.getBytesWritten());
   }
 
   @Test