Ver código fonte

HADOOP-16191. AliyunOSS: improvements for copyFile/copyDirectory and logging. Contributed by wujinhu.

(cherry picked from commit 568d3ab8b65d1348dec9c971feffe200e6cba2ef)
Weiwei Yang 6 anos atrás
pai
commit
f34d6b937f

+ 1 - 1
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java

@@ -50,7 +50,7 @@ public class AliyunOSSCopyFileTask implements Runnable {
   public void run() {
     boolean fail = false;
     try {
-      store.copyFile(srcKey, srcLen, dstKey);
+      fail = !store.copyFile(srcKey, srcLen, dstKey);
     } catch (Exception e) {
       LOG.warn("Exception thrown when copy from "
           + srcKey + " to " + dstKey +  ", exception: " + e);

+ 5 - 3
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

@@ -650,13 +650,15 @@ public class AliyunOSSFileSystem extends FileSystem {
             dstPath));
       }
     }
+
+    boolean succeed;
     if (srcStatus.isDirectory()) {
-      copyDirectory(srcPath, dstPath);
+      succeed = copyDirectory(srcPath, dstPath);
     } else {
-      copyFile(srcPath, srcStatus.getLen(), dstPath);
+      succeed = copyFile(srcPath, srcStatus.getLen(), dstPath);
     }
 
-    return srcPath.equals(dstPath) || delete(srcPath, true);
+    return srcPath.equals(dstPath) || (succeed && delete(srcPath, true));
   }
 
   /**

+ 5 - 2
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -31,6 +31,7 @@ import com.aliyun.oss.model.CompleteMultipartUploadResult;
 import com.aliyun.oss.model.CopyObjectResult;
 import com.aliyun.oss.model.DeleteObjectsRequest;
 import com.aliyun.oss.model.DeleteObjectsResult;
+import com.aliyun.oss.model.GenericRequest;
 import com.aliyun.oss.model.GetObjectRequest;
 import com.aliyun.oss.model.InitiateMultipartUploadRequest;
 import com.aliyun.oss.model.InitiateMultipartUploadResult;
@@ -260,11 +261,13 @@ public class AliyunOSSFileSystemStore {
    */
   public ObjectMetadata getObjectMetadata(String key) {
     try {
-      ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key);
+      GenericRequest request = new GenericRequest(bucketName, key);
+      request.setLogEnabled(false);
+      ObjectMetadata objectMeta = ossClient.getObjectMetadata(request);
       statistics.incrementReadOps(1);
       return objectMeta;
     } catch (OSSException osse) {
-      LOG.error("Exception thrown when get object meta: "
+      LOG.debug("Exception thrown when get object meta: "
               + key + ", exception: " + osse);
       return null;
     }

+ 76 - 0
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.aliyun.oss;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemContractBaseTest;
 import org.apache.hadoop.fs.Path;
 
@@ -359,4 +360,79 @@ public class TestAliyunOSSFileSystemContract
     }
   }
 
+  @Test
+  public void testRenameChangingDirShouldFail() throws Exception {
+    testRenameDir(true, false, false);
+    testRenameDir(true, true, true);
+  }
+
+  @Test
+  public void testRenameDir() throws Exception {
+    testRenameDir(false, true, false);
+    testRenameDir(false, true, true);
+  }
+
+  private void testRenameDir(boolean changing, boolean result, boolean empty)
+      throws Exception {
+    fs.getConf().setLong(Constants.FS_OSS_BLOCK_SIZE_KEY, 1024);
+    String key = "a/b/test.file";
+    for (int i = 0; i < 100; i++) {
+      if (empty) {
+        fs.createNewFile(this.path(key + "." + i));
+      } else {
+        createFile(this.path(key + "." + i));
+      }
+    }
+
+    Path srcPath = this.path("a");
+    Path dstPath = this.path("b");
+    TestRenameTask task = new TestRenameTask(fs, srcPath, dstPath);
+    Thread thread = new Thread(task);
+    thread.start();
+    while (!task.isRunning()) {
+      Thread.sleep(1000);
+    }
+
+    if (changing) {
+      fs.delete(this.path("a/b"), true);
+    }
+
+    thread.join();
+    assertEquals(result, task.isSucceed());
+  }
+
+  class TestRenameTask implements Runnable {
+    private FileSystem fs;
+    private Path srcPath;
+    private Path dstPath;
+    private boolean result;
+    private boolean running;
+    TestRenameTask(FileSystem fs, Path srcPath, Path dstPath) {
+      this.fs = fs;
+      this.srcPath = srcPath;
+      this.dstPath = dstPath;
+      this.result = false;
+      this.running = false;
+    }
+
+    boolean isSucceed() {
+      return this.result;
+    }
+
+    boolean isRunning() {
+      return this.running;
+    }
+    @Override
+    public void run() {
+      try {
+        running = true;
+        result = fs.rename(srcPath, dstPath);
+      } catch (Exception e) {
+      }
+    }
+  }
+
+  protected int getGlobalTimeout() {
+    return 120 * 1000;
+  }
 }