ソースを参照

HADOOP-13529. Do some code refactoring. Contributed by Genmao Yu.

Mingfei 8 年 前
コミット
d33e928fbe

+ 22 - 1
hadoop-tools/hadoop-aliyun/pom.xml

@@ -128,6 +128,27 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

+ 72 - 383
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

@@ -20,18 +20,12 @@ package org.apache.hadoop.fs.aliyun.oss;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.common.auth.CredentialsProvider;
-import com.aliyun.oss.common.auth.DefaultCredentialProvider;
-import com.aliyun.oss.common.auth.DefaultCredentials;
-import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,30 +33,13 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.UserInfo;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.util.Progressable;
 
-import com.aliyun.oss.ClientConfiguration;
-import com.aliyun.oss.OSSClient;
-import com.aliyun.oss.OSSException;
-import com.aliyun.oss.common.comm.Protocol;
-import com.aliyun.oss.model.AbortMultipartUploadRequest;
-import com.aliyun.oss.model.CannedAccessControlList;
-import com.aliyun.oss.model.CompleteMultipartUploadRequest;
-import com.aliyun.oss.model.CompleteMultipartUploadResult;
-import com.aliyun.oss.model.CopyObjectResult;
-import com.aliyun.oss.model.DeleteObjectsRequest;
-import com.aliyun.oss.model.InitiateMultipartUploadRequest;
-import com.aliyun.oss.model.InitiateMultipartUploadResult;
-import com.aliyun.oss.model.ListObjectsRequest;
 import com.aliyun.oss.model.OSSObjectSummary;
 import com.aliyun.oss.model.ObjectListing;
 import com.aliyun.oss.model.ObjectMetadata;
-import com.aliyun.oss.model.PartETag;
-import com.aliyun.oss.model.UploadPartCopyRequest;
-import com.aliyun.oss.model.UploadPartCopyResult;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,12 +52,8 @@ public class AliyunOSSFileSystem extends FileSystem {
       LoggerFactory.getLogger(AliyunOSSFileSystem.class);
   private URI uri;
   private Path workingDir;
-  private OSSClient ossClient;
-  private String bucketName;
-  private long uploadPartSize;
-  private long multipartThreshold;
+  private AliyunOSSFileSystemStore store;
   private int maxKeys;
-  private String serverSideEncryptionAlgorithm;
 
   @Override
   public FSDataOutputStream append(Path path, int bufferSize,
@@ -91,9 +64,7 @@ public class AliyunOSSFileSystem extends FileSystem {
   @Override
   public void close() throws IOException {
     try {
-      if (ossClient != null) {
-        ossClient.shutdown();
-      }
+      store.close();
     } finally {
       super.close();
     }
@@ -125,23 +96,33 @@ public class AliyunOSSFileSystem extends FileSystem {
     }
 
     return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
-        ossClient, bucketName, key, progress, statistics,
-        serverSideEncryptionAlgorithm), (Statistics)(null));
+        store, key, progress, statistics), (Statistics)(null));
   }
 
   @Override
   public boolean delete(Path path, boolean recursive) throws IOException {
-    FileStatus status;
     try {
-      status = getFileStatus(path);
+      return innerDelete(getFileStatus(path), recursive);
     } catch (FileNotFoundException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Couldn't delete " + path + ": Does not exist!");
-      }
+      LOG.debug("Couldn't delete {} - does not exist", path);
       return false;
     }
+  }
 
-    String key = pathToKey(status.getPath());
+  /**
+   * Delete an object. See {@link #delete(Path, boolean)}.
+   *
+   * @param status fileStatus object
+   * @param recursive if path is a directory and set to
+   * true, the directory is deleted else throws an exception. In
+   * case of a file the recursive can be set to either true or false.
+   * @return  true if delete is successful else false.
+   * @throws IOException due to inability to delete a directory or file.
+   */
+  private boolean innerDelete(FileStatus status, boolean recursive)
+      throws IOException {
+    Path f = status.getPath();
+    String key = pathToKey(f);
     if (status.isDirectory()) {
       if (!key.endsWith("/")) {
         key += "/";
@@ -150,54 +131,34 @@ public class AliyunOSSFileSystem extends FileSystem {
         FileStatus[] statuses = listStatus(status.getPath());
         // Check whether it is an empty directory or not
         if (statuses.length > 0) {
-          throw new IOException("Cannot remove directory" + path +
+          throw new IOException("Cannot remove directory " + f +
               ": It is not empty!");
         } else {
           // Delete empty directory without '-r'
-          ossClient.deleteObject(bucketName, key);
-          statistics.incrementWriteOps(1);
+          store.deleteObject(key);
         }
       } else {
-        ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
-        listRequest.setPrefix(key);
-        listRequest.setMaxKeys(maxKeys);
-
-        while (true) {
-          ObjectListing objects = ossClient.listObjects(listRequest);
-          statistics.incrementReadOps(1);
-          List<String> keysToDelete = new ArrayList<String>();
-          for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
-            keysToDelete.add(objectSummary.getKey());
-          }
-          DeleteObjectsRequest deleteRequest =
-              new DeleteObjectsRequest(bucketName);
-          deleteRequest.setKeys(keysToDelete);
-          ossClient.deleteObjects(deleteRequest);
-          statistics.incrementWriteOps(1);
-          if (objects.isTruncated()) {
-            listRequest.setMarker(objects.getNextMarker());
-          } else {
-            break;
-          }
-        }
+        store.deleteDirs(key);
       }
     } else {
-      ossClient.deleteObject(bucketName, key);
-      statistics.incrementWriteOps(1);
+      store.deleteObject(key);
     }
-    //TODO: optimize logic here
+
+    createFakeDirectoryIfNecessary(f);
+    return true;
+  }
+
+  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
     try {
-      Path pPath = status.getPath().getParent();
+      Path pPath = f.getParent();
       FileStatus pStatus = getFileStatus(pPath);
-      if (pStatus.isDirectory()) {
-        return true;
-      } else {
+      if (pStatus.isFile()) {
         throw new IOException("Path " + pPath +
             " is assumed to be a directory!");
       }
     } catch (FileNotFoundException fnfe) {
       // Make sure the parent directory exists
-      return mkdir(bucketName, pathToKey(status.getPath().getParent()));
+      mkdir(pathToKey(f.getParent()));
     }
   }
 
@@ -211,22 +172,15 @@ public class AliyunOSSFileSystem extends FileSystem {
       return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
     }
 
-    ObjectMetadata meta = getObjectMetadata(key);
+    ObjectMetadata meta = store.getObjectMetadata(key);
     // If key not found and key does not end with "/"
     if (meta == null && !key.endsWith("/")) {
       // Case: dir + "/"
       key += "/";
-      meta = getObjectMetadata(key);
+      meta = store.getObjectMetadata(key);
     }
     if (meta == null) {
-      // Case: dir + "/" + file
-      ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
-      listRequest.setPrefix(key);
-      listRequest.setDelimiter("/");
-      listRequest.setMaxKeys(1);
-
-      ObjectListing listing = ossClient.listObjects(listRequest);
-      statistics.incrementReadOps(1);
+      ObjectListing listing = store.listObjects(key, 1, "/", null);
       if (!listing.getObjectSummaries().isEmpty() ||
           !listing.getCommonPrefixes().isEmpty()) {
         return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
@@ -242,22 +196,6 @@ public class AliyunOSSFileSystem extends FileSystem {
     }
   }
 
-  /**
-   * Return object metadata given object key.
-   *
-   * @param key object key
-   * @return return null if key does not exist
-   */
-  private ObjectMetadata getObjectMetadata(String key) {
-    try {
-      return ossClient.getObjectMetadata(bucketName, key);
-    } catch (OSSException osse) {
-      return null;
-    } finally {
-      statistics.incrementReadOps(1);
-    }
-  }
-
   @Override
   public String getScheme() {
     return "oss";
@@ -288,7 +226,6 @@ public class AliyunOSSFileSystem extends FileSystem {
    * Initialize new FileSystem.
    *
    * @param name the uri of the file system, including host, port, etc.
-   *
    * @param conf configuration of the file system
    * @throws IOException IO problems
    */
@@ -296,153 +233,15 @@ public class AliyunOSSFileSystem extends FileSystem {
     super.initialize(name, conf);
 
     uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
-    workingDir =
-        new Path("/user",
-            System.getProperty("user.name")).makeQualified(uri, null);
-
-    bucketName = name.getHost();
-
-    ClientConfiguration clientConf = new ClientConfiguration();
-    clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
-        MAXIMUM_CONNECTIONS_DEFAULT));
-    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
-        SECURE_CONNECTIONS_DEFAULT);
-    clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
-    clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
-        MAX_ERROR_RETRIES_DEFAULT));
-    clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
-        ESTABLISH_TIMEOUT_DEFAULT));
-    clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
-        SOCKET_TIMEOUT_DEFAULT));
-
-    String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
-    int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
-    if (!proxyHost.isEmpty()) {
-      clientConf.setProxyHost(proxyHost);
-      if (proxyPort >= 0) {
-        clientConf.setProxyPort(proxyPort);
-      } else {
-        if (secureConnections) {
-          LOG.warn("Proxy host set without port. Using HTTPS default 443");
-          clientConf.setProxyPort(443);
-        } else {
-          LOG.warn("Proxy host set without port. Using HTTP default 80");
-          clientConf.setProxyPort(80);
-        }
-      }
-      String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
-      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
-      if ((proxyUsername == null) != (proxyPassword == null)) {
-        String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
-            PROXY_PASSWORD_KEY + " set without the other.";
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg);
-      }
-      clientConf.setProxyUsername(proxyUsername);
-      clientConf.setProxyPassword(proxyPassword);
-      clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
-      clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
-    } else if (proxyPort >= 0) {
-      String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
-          PROXY_HOST_KEY;
-      LOG.error(msg);
-      throw new IllegalArgumentException(msg);
-    }
-
-    String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
-    ossClient =
-        new OSSClient(endPoint, getCredentialsProvider(name, conf), clientConf);
+    workingDir = new Path("/user",
+        System.getProperty("user.name")).makeQualified(uri, null);
 
+    store = new AliyunOSSFileSystemStore();
+    store.initialize(name, conf, statistics);
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
-    uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
-        MULTIPART_UPLOAD_SIZE_DEFAULT);
-    multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
-        MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
-
-    if (uploadPartSize < 5 * 1024 * 1024) {
-      LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
-      uploadPartSize = 5 * 1024 * 1024;
-    }
-
-    if (multipartThreshold < 5 * 1024 * 1024) {
-      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
-      multipartThreshold = 5 * 1024 * 1024;
-    }
-
-    if (multipartThreshold > 1024 * 1024 * 1024) {
-      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
-      multipartThreshold = 1024 * 1024 * 1024;
-    }
-
-    String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
-    if (!cannedACLName.isEmpty()) {
-      CannedAccessControlList cannedACL =
-          CannedAccessControlList.valueOf(cannedACLName);
-      ossClient.setBucketAcl(bucketName, cannedACL);
-    }
-
-    serverSideEncryptionAlgorithm =
-        conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
-
     setConf(conf);
   }
 
-  /**
-   * Create the default credential provider, or load in one explicitly
-   * identified in the configuration.
-   * @param name the uri of the file system
-   * @param conf configuration
-   * @return a credential provider
-   * @throws IOException on any problem. Class construction issues may be
-   * nested inside the IOE.
-   */
-  private CredentialsProvider getCredentialsProvider(URI name,
-      Configuration conf) throws IOException {
-    CredentialsProvider credentials;
-
-    String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
-    if (StringUtils.isEmpty(className)) {
-      Configuration newConf =
-          ProviderUtils.excludeIncompatibleCredentialProviders(conf,
-              AliyunOSSFileSystem.class);
-      String accessKey =
-          AliyunOSSUtils.getPassword(newConf, ACCESS_KEY,
-              UserInfo.EMPTY.getUser());
-      String secretKey =
-          AliyunOSSUtils.getPassword(newConf, SECRET_KEY,
-              UserInfo.EMPTY.getPassword());
-      credentials =
-          new DefaultCredentialProvider(
-              new DefaultCredentials(accessKey, secretKey));
-
-    } else {
-      try {
-        LOG.debug("Credential provider class is:" + className);
-        Class<?> credClass = Class.forName(className);
-        try {
-          credentials =
-              (CredentialsProvider)credClass.getDeclaredConstructor(
-                  URI.class, Configuration.class).newInstance(this.uri, conf);
-        } catch (NoSuchMethodException | SecurityException e) {
-          credentials =
-              (CredentialsProvider)credClass.getDeclaredConstructor()
-              .newInstance();
-        }
-      } catch (ClassNotFoundException e) {
-        throw new IOException(className + " not found.", e);
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new IOException(String.format("%s constructor exception.  A " +
-            "class specified in %s must provide an accessible constructor " +
-            "accepting URI and Configuration, or an accessible default " +
-            "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), e);
-      } catch (ReflectiveOperationException | IllegalArgumentException e) {
-        throw new IOException(className + " instantiation exception.", e);
-      }
-    }
-
-    return credentials;
-  }
-
   /**
    * Check if OSS object represents a directory.
    *
@@ -456,10 +255,10 @@ public class AliyunOSSFileSystem extends FileSystem {
   }
 
   /**
-   * Turns a path (relative or otherwise) into an OSS key.
+   * Turn a path (relative or otherwise) into an OSS key.
    *
-   * @param path the path of the file
-   * @return the key of the object that represent the file
+   * @param path the path of the file.
+   * @return the key of the object that represents the file.
    */
   private String pathToKey(Path path) {
     if (!path.isAbsolute()) {
@@ -492,18 +291,12 @@ public class AliyunOSSFileSystem extends FileSystem {
         key = key + "/";
       }
 
-      ListObjectsRequest listObjectsRequest =
-          new ListObjectsRequest(bucketName);
-      listObjectsRequest.setPrefix(key);
-      listObjectsRequest.setDelimiter("/");
-      listObjectsRequest.setMaxKeys(maxKeys);
-
       if (LOG.isDebugEnabled()) {
         LOG.debug("listStatus: doing listObjects for directory " + key);
       }
 
+      ObjectListing objects = store.listObjects(key, maxKeys, "/", null);
       while (true) {
-        ObjectListing objects = ossClient.listObjects(listObjectsRequest);
         statistics.incrementReadOps(1);
         for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
           Path keyPath = keyToPath(objectSummary.getKey())
@@ -539,7 +332,8 @@ public class AliyunOSSFileSystem extends FileSystem {
           if (LOG.isDebugEnabled()) {
             LOG.debug("listStatus: list truncated - getting next batch");
           }
-          listObjectsRequest.setMarker(objects.getNextMarker());
+          objects = store.listObjects(key, maxKeys, "/",
+              objects.getNextMarker());
           statistics.incrementReadOps(1);
         } else {
           break;
@@ -558,27 +352,17 @@ public class AliyunOSSFileSystem extends FileSystem {
   /**
    * Used to create an empty file that represents an empty directory.
    *
-   * @param bucket the bucket this directory belongs to
    * @param key directory path
-   * @return true if directory successfully created
+   * @return true if directory is successfully created
    * @throws IOException
    */
-  private boolean mkdir(final String bucket, final String key)
-      throws IOException {
+  private boolean mkdir(final String key) throws IOException {
     String dirName = key;
-    ObjectMetadata dirMeta = new ObjectMetadata();
-    byte[] buffer = new byte[0];
-    ByteArrayInputStream in = new ByteArrayInputStream(buffer);
-    dirMeta.setContentLength(0);
     if (!key.endsWith("/")) {
       dirName += "/";
     }
-    try {
-      ossClient.putObject(bucket, dirName, in, dirMeta);
-      return true;
-    } finally {
-      in.close();
-    }
+    store.storeEmptyFile(dirName);
+    return true;
   }
 
   @Override
@@ -595,14 +379,14 @@ public class AliyunOSSFileSystem extends FileSystem {
     } catch (FileNotFoundException e) {
       validatePath(path);
       String key = pathToKey(path);
-      return mkdir(bucketName, key);
+      return mkdir(key);
     }
   }
 
   /**
    * Check whether the path is a valid path.
    *
-   * @param path the path to be checked
+   * @param path the path to be checked.
    * @throws IOException
    */
   private void validatePath(Path path) throws IOException {
@@ -631,8 +415,8 @@ public class AliyunOSSFileSystem extends FileSystem {
           " because it is a directory");
     }
 
-    return new FSDataInputStream(new AliyunOSSInputStream(getConf(), ossClient,
-        bucketName, pathToKey(path), fileStatus.getLen(), statistics));
+    return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
+        pathToKey(path), fileStatus.getLen(), statistics));
   }
 
   @Override
@@ -696,126 +480,31 @@ public class AliyunOSSFileSystem extends FileSystem {
     } else {
       copyFile(srcPath, dstPath);
     }
-    if (srcPath.equals(dstPath)) {
-      return true;
-    } else {
-      return delete(srcPath, true);
-    }
+
+    return srcPath.equals(dstPath) || delete(srcPath, true);
   }
 
   /**
    * Copy file from source path to destination path.
-   * (the caller should make sure srcPath is a file and dstPath is valid.)
+   * (the caller should make sure srcPath is a file and dstPath is valid)
    *
-   * @param srcPath source path
-   * @param dstPath destination path
-   * @return true if successfully copied
+   * @param srcPath source path.
+   * @param dstPath destination path.
+   * @return true if file is successfully copied.
    */
   private boolean copyFile(Path srcPath, Path dstPath) {
     String srcKey = pathToKey(srcPath);
     String dstKey = pathToKey(dstPath);
-    return copyFile(srcKey, dstKey);
-  }
-
-  /**
-   * Copy an object from source key to destination key.
-   *
-   * @param srcKey source key
-   * @param dstKey destination key
-   * @return true if successfully copied
-   */
-  private boolean copyFile(String srcKey, String dstKey) {
-    ObjectMetadata objectMeta =
-        ossClient.getObjectMetadata(bucketName, srcKey);
-    long dataLen = objectMeta.getContentLength();
-    if (dataLen <= multipartThreshold) {
-      return singleCopy(srcKey, dstKey);
-    } else {
-      return multipartCopy(srcKey, dataLen, dstKey);
-    }
-  }
-
-  /**
-   * Use single copy to copy an oss object.
-   *
-   * @param srcKey source key
-   * @param dstKey destination key
-   * @return true if successfully copied
-   * (the caller should make sure srcPath is a file and dstPath is valid)
-   */
-  private boolean singleCopy(String srcKey, String dstKey) {
-    CopyObjectResult copyResult =
-        ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
-    LOG.debug(copyResult.getETag());
-    return true;
-  }
-
-  /**
-   * Use multipart copy to copy an oss object.
-   * (the caller should make sure srcPath is a file and dstPath is valid)
-   *
-   * @param srcKey source key
-   * @param dataLen data size of the object to copy
-   * @param dstKey destination key
-   * @return true if successfully copied, or false if upload is aborted
-   */
-  private boolean multipartCopy(String srcKey, long dataLen, String dstKey) {
-    int partNum = (int)(dataLen / uploadPartSize);
-    if (dataLen % uploadPartSize != 0) {
-      partNum++;
-    }
-    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
-        new InitiateMultipartUploadRequest(bucketName, dstKey);
-    ObjectMetadata meta = new ObjectMetadata();
-    if (!serverSideEncryptionAlgorithm.isEmpty()) {
-      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
-    }
-    initiateMultipartUploadRequest.setObjectMetadata(meta);
-    InitiateMultipartUploadResult initiateMultipartUploadResult =
-        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
-    String uploadId = initiateMultipartUploadResult.getUploadId();
-    List<PartETag> partETags = new ArrayList<PartETag>();
-    try {
-      for (int i = 0; i < partNum; i++) {
-        long skipBytes = uploadPartSize * i;
-        long size = (uploadPartSize < dataLen - skipBytes) ?
-            uploadPartSize : dataLen - skipBytes;
-        UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
-        partCopyRequest.setSourceBucketName(bucketName);
-        partCopyRequest.setSourceKey(srcKey);
-        partCopyRequest.setBucketName(bucketName);
-        partCopyRequest.setKey(dstKey);
-        partCopyRequest.setUploadId(uploadId);
-        partCopyRequest.setPartSize(size);
-        partCopyRequest.setBeginIndex(skipBytes);
-        partCopyRequest.setPartNumber(i + 1);
-        UploadPartCopyResult partCopyResult =
-            ossClient.uploadPartCopy(partCopyRequest);
-        statistics.incrementWriteOps(1);
-        partETags.add(partCopyResult.getPartETag());
-      }
-      CompleteMultipartUploadRequest completeMultipartUploadRequest =
-          new CompleteMultipartUploadRequest(bucketName, dstKey,
-          uploadId, partETags);
-      CompleteMultipartUploadResult completeMultipartUploadResult =
-          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
-      LOG.debug(completeMultipartUploadResult.getETag());
-      return true;
-    } catch (OSSException | ClientException e) {
-      AbortMultipartUploadRequest abortMultipartUploadRequest =
-          new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
-      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
-      return false;
-    }
+    return store.copyFile(srcKey, dstKey);
   }
 
   /**
    * Copy a directory from source path to destination path.
    * (the caller should make sure srcPath is a directory, and dstPath is valid)
    *
-   * @param srcPath source path
-   * @param dstPath destination path
-   * @return true if successfully copied
+   * @param srcPath source path.
+   * @param dstPath destination path.
+   * @return true if directory is successfully copied.
    */
   private boolean copyDirectory(Path srcPath, Path dstPath) {
     String srcKey = pathToKey(srcPath);
@@ -835,21 +524,18 @@ public class AliyunOSSFileSystem extends FileSystem {
       return false;
     }
 
-    ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName);
-    listObjectsRequest.setPrefix(srcKey);
-    listObjectsRequest.setMaxKeys(maxKeys);
-
-    ObjectListing objects = ossClient.listObjects(listObjectsRequest);
+    ObjectListing objects = store.listObjects(srcKey, maxKeys, null, null);
     statistics.incrementReadOps(1);
     // Copy files from src folder to dst
     while (true) {
       for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
         String newKey =
             dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
-        copyFile(objectSummary.getKey(), newKey);
+        store.copyFile(objectSummary.getKey(), newKey);
       }
       if (objects.isTruncated()) {
-        listObjectsRequest.setMarker(objects.getNextMarker());
+        objects = store.listObjects(srcKey, maxKeys, null,
+            objects.getNextMarker());
         statistics.incrementReadOps(1);
       } else {
         break;
@@ -863,4 +549,7 @@ public class AliyunOSSFileSystem extends FileSystem {
     this.workingDir = dir;
   }
 
+  public AliyunOSSFileSystemStore getStore() {
+    return store;
+  }
 }

+ 486 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.ClientConfiguration;
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.model.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Core implementation of Aliyun OSS Filesystem for Hadoop.
+ * Provides the bridging logic between Hadoop's abstract filesystem and
+ * Aliyun OSS.
+ */
+public class AliyunOSSFileSystemStore {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
+  private FileSystem.Statistics statistics;
+  private OSSClient ossClient;
+  private String bucketName;
+  private long uploadPartSize;
+  private long multipartThreshold;
+  private long partSize;
+  private int maxKeys;
+  private String serverSideEncryptionAlgorithm;
+
+  public void initialize(URI uri, Configuration conf,
+                         FileSystem.Statistics stat) throws IOException {
+    statistics = stat;
+    ClientConfiguration clientConf = new ClientConfiguration();
+    clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
+        MAXIMUM_CONNECTIONS_DEFAULT));
+    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
+        SECURE_CONNECTIONS_DEFAULT);
+    clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+    clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
+        MAX_ERROR_RETRIES_DEFAULT));
+    clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
+        ESTABLISH_TIMEOUT_DEFAULT));
+    clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
+        SOCKET_TIMEOUT_DEFAULT));
+
+    String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
+    int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
+    if (!proxyHost.isEmpty()) {
+      clientConf.setProxyHost(proxyHost);
+      if (proxyPort >= 0) {
+        clientConf.setProxyPort(proxyPort);
+      } else {
+        if (secureConnections) {
+          LOG.warn("Proxy host set without port. Using HTTPS default 443");
+          clientConf.setProxyPort(443);
+        } else {
+          LOG.warn("Proxy host set without port. Using HTTP default 80");
+          clientConf.setProxyPort(80);
+        }
+      }
+      String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
+      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
+      if ((proxyUsername == null) != (proxyPassword == null)) {
+        String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
+            PROXY_PASSWORD_KEY + " set without the other.";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      clientConf.setProxyUsername(proxyUsername);
+      clientConf.setProxyPassword(proxyPassword);
+      clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
+      clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
+    } else if (proxyPort >= 0) {
+      String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
+          PROXY_HOST_KEY;
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+
+    String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
+    CredentialsProvider provider =
+        AliyunOSSUtils.getCredentialsProvider(uri, conf);
+    ossClient = new OSSClient(endPoint, provider, clientConf);
+    uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
+        MULTIPART_UPLOAD_SIZE_DEFAULT);
+    multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
+        MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
+    partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
+        MULTIPART_UPLOAD_SIZE_DEFAULT);
+    if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
+      partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+    }
+    serverSideEncryptionAlgorithm =
+        conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
+
+    if (uploadPartSize < 5 * 1024 * 1024) {
+      LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
+      uploadPartSize = 5 * 1024 * 1024;
+    }
+
+    if (multipartThreshold < 5 * 1024 * 1024) {
+      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
+      multipartThreshold = 5 * 1024 * 1024;
+    }
+
+    if (multipartThreshold > 1024 * 1024 * 1024) {
+      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
+      multipartThreshold = 1024 * 1024 * 1024;
+    }
+
+    String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
+    if (!cannedACLName.isEmpty()) {
+      CannedAccessControlList cannedACL =
+          CannedAccessControlList.valueOf(cannedACLName);
+      ossClient.setBucketAcl(bucketName, cannedACL);
+    }
+
+    maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
+    bucketName = uri.getHost();
+  }
+
+  /**
+   * Delete an object, and update write operation statistics.
+   *
+   * @param key key to blob to delete.
+   */
+  public void deleteObject(String key) {
+    ossClient.deleteObject(bucketName, key);
+    statistics.incrementWriteOps(1);
+  }
+
+  /**
+   * Delete a list of keys, and update write operation statistics.
+   *
+   * @param keysToDelete collection of keys to delete.
+   */
+  public void deleteObjects(List<String> keysToDelete) {
+    DeleteObjectsRequest deleteRequest =
+        new DeleteObjectsRequest(bucketName);
+    deleteRequest.setKeys(keysToDelete);
+    ossClient.deleteObjects(deleteRequest);
+    statistics.incrementWriteOps(keysToDelete.size());
+  }
+
+  /**
+   * Delete a directory from Aliyun OSS.
+   *
+   * @param key directory key to delete.
+   */
+  public void deleteDirs(String key) {
+    ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+    listRequest.setPrefix(key);
+    listRequest.setMaxKeys(maxKeys);
+
+    while (true) {
+      ObjectListing objects = ossClient.listObjects(listRequest);
+      statistics.incrementReadOps(1);
+      List<String> keysToDelete = new ArrayList<String>();
+      for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+        keysToDelete.add(objectSummary.getKey());
+      }
+      deleteObjects(keysToDelete);
+      if (objects.isTruncated()) {
+        listRequest.setMarker(objects.getNextMarker());
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Return metadata of a given object key.
+   *
+   * @param key object key.
+   * @return return null if key does not exist.
+   */
+  public ObjectMetadata getObjectMetadata(String key) {
+    try {
+      return ossClient.getObjectMetadata(bucketName, key);
+    } catch (OSSException osse) {
+      return null;
+    } finally {
+      statistics.incrementReadOps(1);
+    }
+  }
+
+  /**
+   * Upload an empty file as an OSS object, using single upload.
+   *
+   * @param key object key.
+   * @throws IOException if failed to upload object.
+   */
+  public void storeEmptyFile(String key) throws IOException {
+    ObjectMetadata dirMeta = new ObjectMetadata();
+    byte[] buffer = new byte[0];
+    ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+    dirMeta.setContentLength(0);
+    try {
+      ossClient.putObject(bucketName, key, in, dirMeta);
+    } finally {
+      in.close();
+    }
+  }
+
+  /**
+   * Copy an object from source key to destination key.
+   *
+   * @param srcKey source key.
+   * @param dstKey destination key.
+   * @return true if file is successfully copied.
+   */
+  public boolean copyFile(String srcKey, String dstKey) {
+    ObjectMetadata objectMeta =
+        ossClient.getObjectMetadata(bucketName, srcKey);
+    long contentLength = objectMeta.getContentLength();
+    if (contentLength <= multipartThreshold) {
+      return singleCopy(srcKey, dstKey);
+    } else {
+      return multipartCopy(srcKey, contentLength, dstKey);
+    }
+  }
+
+  /**
+   * Use single copy to copy an OSS object.
+   * (The caller should make sure srcPath is a file and dstPath is valid)
+   *
+   * @param srcKey source key.
+   * @param dstKey destination key.
+   * @return true if object is successfully copied.
+   */
+  private boolean singleCopy(String srcKey, String dstKey) {
+    CopyObjectResult copyResult =
+        ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
+    LOG.debug(copyResult.getETag());
+    return true;
+  }
+
+  /**
+   * Use multipart copy to copy an OSS object.
+   * (The caller should make sure srcPath is a file and dstPath is valid)
+   *
+   * @param srcKey source key.
+   * @param contentLength data size of the object to copy.
+   * @param dstKey destination key.
+   * @return true if success, or false if upload is aborted.
+   */
+  private boolean multipartCopy(String srcKey, long contentLength,
+      String dstKey) {
+    long realPartSize =
+        AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
+    int partNum = (int) (contentLength / realPartSize);
+    if (contentLength % realPartSize != 0) {
+      partNum++;
+    }
+    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+        new InitiateMultipartUploadRequest(bucketName, dstKey);
+    ObjectMetadata meta = new ObjectMetadata();
+    if (!serverSideEncryptionAlgorithm.isEmpty()) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    initiateMultipartUploadRequest.setObjectMetadata(meta);
+    InitiateMultipartUploadResult initiateMultipartUploadResult =
+        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+    String uploadId = initiateMultipartUploadResult.getUploadId();
+    List<PartETag> partETags = new ArrayList<PartETag>();
+    try {
+      for (int i = 0; i < partNum; i++) {
+        long skipBytes = realPartSize * i;
+        long size = (realPartSize < contentLength - skipBytes) ?
+            realPartSize : contentLength - skipBytes;
+        UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
+        partCopyRequest.setSourceBucketName(bucketName);
+        partCopyRequest.setSourceKey(srcKey);
+        partCopyRequest.setBucketName(bucketName);
+        partCopyRequest.setKey(dstKey);
+        partCopyRequest.setUploadId(uploadId);
+        partCopyRequest.setPartSize(size);
+        partCopyRequest.setBeginIndex(skipBytes);
+        partCopyRequest.setPartNumber(i + 1);
+        UploadPartCopyResult partCopyResult =
+            ossClient.uploadPartCopy(partCopyRequest);
+        statistics.incrementWriteOps(1);
+        partETags.add(partCopyResult.getPartETag());
+      }
+      CompleteMultipartUploadRequest completeMultipartUploadRequest =
+          new CompleteMultipartUploadRequest(bucketName, dstKey,
+              uploadId, partETags);
+      CompleteMultipartUploadResult completeMultipartUploadResult =
+          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+      LOG.debug(completeMultipartUploadResult.getETag());
+      return true;
+    } catch (OSSException | ClientException e) {
+      AbortMultipartUploadRequest abortMultipartUploadRequest =
+          new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
+      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
+      return false;
+    }
+  }
+
+  /**
+   * Upload a file as an OSS object, using single upload.
+   *
+   * @param key object key.
+   * @param file local file to upload.
+   * @throws IOException if failed to upload object.
+   */
+  public void uploadObject(String key, File file) throws IOException {
+    File object = file.getAbsoluteFile();
+    FileInputStream fis = new FileInputStream(object);
+    ObjectMetadata meta = new ObjectMetadata();
+    meta.setContentLength(object.length());
+    if (!serverSideEncryptionAlgorithm.isEmpty()) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    try {
+      PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
+      LOG.debug(result.getETag());
+      statistics.incrementWriteOps(1);
+    } finally {
+      fis.close();
+    }
+  }
+
+  /**
+   * Upload a file as an OSS object, using multipart upload.
+   *
+   * @param key object key.
+   * @param file local file to upload.
+   * @throws IOException if failed to upload object.
+   */
+  public void multipartUploadObject(String key, File file) throws IOException {
+    File object = file.getAbsoluteFile();
+    long dataLen = object.length();
+    long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
+    int partNum = (int) (dataLen / realPartSize);
+    if (dataLen % realPartSize != 0) {
+      partNum += 1;
+    }
+
+    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+        new InitiateMultipartUploadRequest(bucketName, key);
+    ObjectMetadata meta = new ObjectMetadata();
+    if (!serverSideEncryptionAlgorithm.isEmpty()) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    initiateMultipartUploadRequest.setObjectMetadata(meta);
+    InitiateMultipartUploadResult initiateMultipartUploadResult =
+        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+    List<PartETag> partETags = new ArrayList<PartETag>();
+    String uploadId = initiateMultipartUploadResult.getUploadId();
+
+    try {
+      for (int i = 0; i < partNum; i++) {
+        // TODO: Optimize this, avoid opening the object multiple times
+        FileInputStream fis = new FileInputStream(object);
+        try {
+          long skipBytes = realPartSize * i;
+          AliyunOSSUtils.skipFully(fis, skipBytes);
+          long size = (realPartSize < dataLen - skipBytes) ?
+              realPartSize : dataLen - skipBytes;
+          UploadPartRequest uploadPartRequest = new UploadPartRequest();
+          uploadPartRequest.setBucketName(bucketName);
+          uploadPartRequest.setKey(key);
+          uploadPartRequest.setUploadId(uploadId);
+          uploadPartRequest.setInputStream(fis);
+          uploadPartRequest.setPartSize(size);
+          uploadPartRequest.setPartNumber(i + 1);
+          UploadPartResult uploadPartResult =
+              ossClient.uploadPart(uploadPartRequest);
+          statistics.incrementWriteOps(1);
+          partETags.add(uploadPartResult.getPartETag());
+        } finally {
+          fis.close();
+        }
+      }
+      CompleteMultipartUploadRequest completeMultipartUploadRequest =
+          new CompleteMultipartUploadRequest(bucketName, key,
+              uploadId, partETags);
+      CompleteMultipartUploadResult completeMultipartUploadResult =
+          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+      LOG.debug(completeMultipartUploadResult.getETag());
+    } catch (OSSException | ClientException e) {
+      AbortMultipartUploadRequest abortMultipartUploadRequest =
+          new AbortMultipartUploadRequest(bucketName, key, uploadId);
+      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
+    }
+  }
+
+  /**
+   * list objects.
+   *
+   * @param prefix prefix.
+   * @param maxListingLength max no. of entries
+   * @param delimiter delimiter.
+   * @param marker last key in any previous search.
+   * @return a list of matches.
+   */
+  public ObjectListing listObjects(String prefix, int maxListingLength,
+                                   String delimiter, String marker) {
+    ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+    listRequest.setPrefix(prefix);
+    listRequest.setDelimiter(delimiter);
+    listRequest.setMaxKeys(maxListingLength);
+    listRequest.setMarker(marker);
+
+    ObjectListing listing = ossClient.listObjects(listRequest);
+    statistics.incrementReadOps(1);
+    return listing;
+  }
+
+  /**
+   * Retrieve a part of an object.
+   *
+   * @param key the object name that is being retrieved from the Aliyun OSS.
+   * @param byteStart start position.
+   * @param byteEnd end position.
+   * @return This method returns null if the key is not found.
+   */
+  public InputStream retrieve(String key, long byteStart, long byteEnd) {
+    try {
+      GetObjectRequest request = new GetObjectRequest(bucketName, key);
+      request.setRange(byteStart, byteEnd);
+      return ossClient.getObject(request).getObjectContent();
+    } catch (OSSException | ClientException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Close OSS client properly.
+   */
+  public void close() {
+    if (ossClient != null) {
+      ossClient.shutdown();
+      ossClient = null;
+    }
+  }
+
+  /**
+   * Clean up all objects matching the prefix.
+   *
+   * @param prefix Aliyun OSS object prefix.
+   */
+  public void purge(String prefix) {
+    String key;
+    try {
+      ObjectListing objects = listObjects(prefix, maxKeys, null, null);
+      for (OSSObjectSummary object : objects.getObjectSummaries()) {
+        key = object.getKey();
+        ossClient.deleteObject(bucketName, key);
+      }
+
+      for (String dir: objects.getCommonPrefixes()) {
+        deleteDirs(dir);
+      }
+    } catch (OSSException | ClientException e) {
+      LOG.error("Failed to purge " + prefix);
+    }
+  }
+}

+ 26 - 34
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java

@@ -27,12 +27,10 @@ import java.io.InputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 
-import com.aliyun.oss.OSSClient;
-import com.aliyun.oss.model.GetObjectRequest;
-
 /**
  * The input stream for OSS blob system.
  * The class uses multi-part downloading to read data from the object content
@@ -40,27 +38,23 @@ import com.aliyun.oss.model.GetObjectRequest;
  */
 public class AliyunOSSInputStream extends FSInputStream {
   public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class);
-  private static final int MAX_RETRIES = 10;
   private final long downloadPartSize;
-
-  private String bucketName;
-  private String key;
-  private OSSClient ossClient;
+  private AliyunOSSFileSystemStore store;
+  private final String key;
   private Statistics statistics;
   private boolean closed;
   private InputStream wrappedStream = null;
-  private long dataLen;
+  private long contentLength;
   private long position;
   private long partRemaining;
 
-  public AliyunOSSInputStream(Configuration conf, OSSClient client,
-      String bucketName, String key, Long dataLen, Statistics statistics)
-      throws IOException {
-    this.bucketName = bucketName;
+  public AliyunOSSInputStream(Configuration conf,
+      AliyunOSSFileSystemStore store, String key, Long contentLength,
+      Statistics statistics) throws IOException {
+    this.store = store;
     this.key = key;
-    ossClient = client;
     this.statistics = statistics;
-    this.dataLen = dataLen;
+    this.contentLength = contentLength;
     downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
         MULTIPART_DOWNLOAD_SIZE_DEFAULT);
     reopen(0);
@@ -75,18 +69,17 @@ public class AliyunOSSInputStream extends FSInputStream {
    * @throws IOException if failed to reopen
    */
   private synchronized void reopen(long pos) throws IOException {
-
-    long partLen;
+    long partSize;
 
     if (pos < 0) {
-      throw new EOFException("Cannot seek at negtive position:" + pos);
-    } else if (pos > dataLen) {
-      throw new EOFException("Cannot seek after EOF, fileLen:" + dataLen +
-          " position:" + pos);
-    } else if (pos + downloadPartSize > dataLen) {
-      partLen = dataLen - pos;
+      throw new EOFException("Cannot seek at negative position:" + pos);
+    } else if (pos > contentLength) {
+      throw new EOFException("Cannot seek after EOF, contentLength:" +
+          contentLength + " position:" + pos);
+    } else if (pos + downloadPartSize > contentLength) {
+      partSize = contentLength - pos;
     } else {
-      partLen = downloadPartSize;
+      partSize = downloadPartSize;
     }
 
     if (wrappedStream != null) {
@@ -96,21 +89,19 @@ public class AliyunOSSInputStream extends FSInputStream {
       wrappedStream.close();
     }
 
-    GetObjectRequest request = new GetObjectRequest(bucketName, key);
-    request.setRange(pos, pos + partLen - 1);
-    wrappedStream = ossClient.getObject(request).getObjectContent();
+    wrappedStream = store.retrieve(key, pos, pos + partSize -1);
     if (wrappedStream == null) {
       throw new IOException("Null IO stream");
     }
     position = pos;
-    partRemaining = partLen;
+    partRemaining = partSize;
   }
 
   @Override
   public synchronized int read() throws IOException {
     checkNotClosed();
 
-    if (partRemaining <= 0 && position < dataLen) {
+    if (partRemaining <= 0 && position < contentLength) {
       reopen(position);
     }
 
@@ -139,13 +130,14 @@ public class AliyunOSSInputStream extends FSInputStream {
 
 
   /**
-   * Check whether the input stream is closed.
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
    *
-   * @throws IOException if stream is closed
+   * @throws IOException if the connection is closed.
    */
   private void checkNotClosed() throws IOException {
     if (closed) {
-      throw new IOException("Stream is closed!");
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
     }
   }
 
@@ -164,7 +156,7 @@ public class AliyunOSSInputStream extends FSInputStream {
 
     int bytesRead = 0;
     // Not EOF, and read not done
-    while (position < dataLen && bytesRead < len) {
+    while (position < contentLength && bytesRead < len) {
       if (partRemaining == 0) {
         reopen(position);
       }
@@ -219,7 +211,7 @@ public class AliyunOSSInputStream extends FSInputStream {
   public synchronized int available() throws IOException {
     checkNotClosed();
 
-    long remaining = dataLen - position;
+    long remaining = contentLength - position;
     if (remaining > Integer.MAX_VALUE) {
       return Integer.MAX_VALUE;
     }

+ 8 - 121
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java

@@ -22,15 +22,10 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
 
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSSException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,18 +33,6 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.util.Progressable;
 
-import com.aliyun.oss.OSSClient;
-import com.aliyun.oss.model.AbortMultipartUploadRequest;
-import com.aliyun.oss.model.CompleteMultipartUploadRequest;
-import com.aliyun.oss.model.CompleteMultipartUploadResult;
-import com.aliyun.oss.model.InitiateMultipartUploadRequest;
-import com.aliyun.oss.model.InitiateMultipartUploadResult;
-import com.aliyun.oss.model.ObjectMetadata;
-import com.aliyun.oss.model.PartETag;
-import com.aliyun.oss.model.PutObjectResult;
-import com.aliyun.oss.model.UploadPartRequest;
-import com.aliyun.oss.model.UploadPartResult;
-
 /**
  * The output stream for OSS blob system.
  * Data will be buffered on local disk, then uploaded to OSS in
@@ -57,36 +40,24 @@ import com.aliyun.oss.model.UploadPartResult;
  */
 public class AliyunOSSOutputStream extends OutputStream {
   public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
-  private String bucketName;
-  private String key;
+  private AliyunOSSFileSystemStore store;
+  private final String key;
   private Statistics statistics;
   private Progressable progress;
-  private String serverSideEncryptionAlgorithm;
-  private long partSize;
   private long partSizeThreshold;
   private LocalDirAllocator dirAlloc;
   private boolean closed;
   private File tmpFile;
   private BufferedOutputStream backupStream;
-  private OSSClient ossClient;
 
-  public AliyunOSSOutputStream(Configuration conf, OSSClient client,
-      String bucketName, String key, Progressable progress,
-      Statistics statistics, String serverSideEncryptionAlgorithm)
-      throws IOException {
-    this.bucketName = bucketName;
+  public AliyunOSSOutputStream(Configuration conf,
+      AliyunOSSFileSystemStore store, String key, Progressable progress,
+      Statistics statistics) throws IOException {
+    this.store = store;
     this.key = key;
     // The caller cann't get any progress information
     this.progress = progress;
-    ossClient = client;
     this.statistics = statistics;
-    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
-
-    partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
-        MULTIPART_UPLOAD_SIZE_DEFAULT);
-    if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
-      partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
-    }
     partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
         MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
 
@@ -113,9 +84,9 @@ public class AliyunOSSOutputStream extends OutputStream {
     long dataLen = tmpFile.length();
     try {
       if (dataLen <= partSizeThreshold) {
-        uploadObject();
+        store.uploadObject(key, tmpFile);
       } else {
-        multipartUploadObject();
+        store.multipartUploadObject(key, tmpFile);
       }
     } finally {
       if (!tmpFile.delete()) {
@@ -124,91 +95,7 @@ public class AliyunOSSOutputStream extends OutputStream {
     }
   }
 
-  /**
-   * Upload temporary file as an OSS object, using single upload.
-   *
-   * @throws IOException
-   */
-  private void uploadObject() throws IOException {
-    File object = tmpFile.getAbsoluteFile();
-    FileInputStream fis = new FileInputStream(object);
-    ObjectMetadata meta = new ObjectMetadata();
-    meta.setContentLength(object.length());
-    if (!serverSideEncryptionAlgorithm.isEmpty()) {
-      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
-    }
-    try {
-      PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
-      LOG.debug(result.getETag());
-      statistics.incrementWriteOps(1);
-    } finally {
-      fis.close();
-    }
-  }
-
-  /**
-   * Upload temporary file as an OSS object, using multipart upload.
-   *
-   * @throws IOException
-   */
-  private void multipartUploadObject() throws IOException {
-    File object = tmpFile.getAbsoluteFile();
-    long dataLen = object.length();
-    long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
-    int partNum = (int)(dataLen / realPartSize);
-    if (dataLen % realPartSize != 0) {
-      partNum += 1;
-    }
-
-    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
-        new InitiateMultipartUploadRequest(bucketName, key);
-    ObjectMetadata meta = new ObjectMetadata();
-    //    meta.setContentLength(dataLen);
-    if (!serverSideEncryptionAlgorithm.isEmpty()) {
-      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
-    }
-    initiateMultipartUploadRequest.setObjectMetadata(meta);
-    InitiateMultipartUploadResult initiateMultipartUploadResult =
-        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
-    List<PartETag> partETags = new ArrayList<PartETag>();
-    String uploadId = initiateMultipartUploadResult.getUploadId();
 
-    try {
-      for (int i = 0; i < partNum; i++) {
-        // TODO: Optimize this, avoid opening the object multiple times
-        FileInputStream fis = new FileInputStream(object);
-        try {
-          long skipBytes = realPartSize * i;
-          AliyunOSSUtils.skipFully(fis, skipBytes);
-          long size = (realPartSize < dataLen - skipBytes) ?
-              realPartSize : dataLen - skipBytes;
-          UploadPartRequest uploadPartRequest = new UploadPartRequest();
-          uploadPartRequest.setBucketName(bucketName);
-          uploadPartRequest.setKey(key);
-          uploadPartRequest.setUploadId(uploadId);
-          uploadPartRequest.setInputStream(fis);
-          uploadPartRequest.setPartSize(size);
-          uploadPartRequest.setPartNumber(i + 1);
-          UploadPartResult uploadPartResult =
-              ossClient.uploadPart(uploadPartRequest);
-          statistics.incrementWriteOps(1);
-          partETags.add(uploadPartResult.getPartETag());
-        } finally {
-          fis.close();
-        }
-      }
-      CompleteMultipartUploadRequest completeMultipartUploadRequest =
-          new CompleteMultipartUploadRequest(bucketName, key,
-          uploadId, partETags);
-      CompleteMultipartUploadResult completeMultipartUploadResult =
-          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
-      LOG.debug(completeMultipartUploadResult.getETag());
-    } catch (OSSException | ClientException e) {
-      AbortMultipartUploadRequest abortMultipartUploadRequest =
-          new AbortMultipartUploadRequest(bucketName, key, uploadId);
-      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
-    }
-  }
 
   @Override
   public synchronized void flush() throws IOException {

+ 83 - 110
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java

@@ -20,142 +20,58 @@ package org.apache.hadoop.fs.aliyun.oss;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.net.URI;
-import java.net.URLDecoder;
-import java.util.Objects;
 
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.DefaultCredentialProvider;
+import com.aliyun.oss.common.auth.DefaultCredentials;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ProviderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY;
 
 /**
  * Utility methods for Aliyun OSS code.
  */
 final public class AliyunOSSUtils {
-  private AliyunOSSUtils() {
-  }
-
-  /**
-   * User information includes user name and password.
-   */
-  static public class UserInfo {
-    private final String user;
-    private final String password;
-
-    public static final UserInfo EMPTY = new UserInfo("", "");
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AliyunOSSUtils.class);
 
-    public UserInfo(String user, String password) {
-      this.user = user;
-      this.password = password;
-    }
-
-    /**
-     * Predicate to verify user information is set.
-     * @return true if the username is defined (not null, not empty).
-     */
-    public boolean hasLogin() {
-      return StringUtils.isNotEmpty(user);
-    }
-
-    /**
-     * Equality test matches user and password.
-     * @param o other object
-     * @return true if the objects are considered equivalent.
-     */
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      UserInfo that = (UserInfo) o;
-      return Objects.equals(user, that.user) &&
-          Objects.equals(password, that.password);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(user, password);
-    }
-
-    public String getUser() {
-      return user;
-    }
-
-    public String getPassword() {
-      return password;
-    }
+  private AliyunOSSUtils() {
   }
 
   /**
-   * Used to get password from configuration, if default value is not available.
+   * Used to get password from configuration.
+   *
    * @param conf configuration that contains password information
    * @param key the key of the password
-   * @param val the default value of the key
    * @return the value for the key
    * @throws IOException if failed to get password from configuration
    */
-  static public String getPassword(Configuration conf, String key, String val)
+  static public String getPassword(Configuration conf, String key)
       throws IOException {
-    if (StringUtils.isEmpty(val)) {
-      try {
-        final char[] pass = conf.getPassword(key);
-        if (pass != null) {
-          return (new String(pass)).trim();
-        } else {
-          return "";
-        }
-      } catch (IOException ioe) {
-        throw new IOException("Cannot find password option " + key, ioe);
-      }
-    } else {
-      return val;
-    }
-  }
-
-  /**
-   * Extract the user information details from a URI.
-   * @param name URI of the filesystem.
-   * @return a login tuple, possibly empty.
-   */
-  public static UserInfo extractLoginDetails(URI name) {
     try {
-      String authority = name.getAuthority();
-      if (authority == null) {
-        return UserInfo.EMPTY;
-      }
-      int loginIndex = authority.indexOf('@');
-      if (loginIndex < 0) {
-        // No user information
-        return UserInfo.EMPTY;
-      }
-      String login = authority.substring(0, loginIndex);
-      int loginSplit = login.indexOf(':');
-      if (loginSplit > 0) {
-        String user = login.substring(0, loginSplit);
-        String password = URLDecoder.decode(login.substring(loginSplit + 1),
-            "UTF-8");
-        return new UserInfo(user, password);
-      } else if (loginSplit == 0) {
-        // There is no user, just a password.
-        return UserInfo.EMPTY;
+      final char[] pass = conf.getPassword(key);
+      if (pass != null) {
+        return (new String(pass)).trim();
       } else {
-        return new UserInfo(login, "");
+        return "";
       }
-    } catch (UnsupportedEncodingException e) {
-      // This should never happen; translate it if it does.
-      throw new RuntimeException(e);
+    } catch (IOException ioe) {
+      throw new IOException("Cannot find password option " + key, ioe);
     }
   }
 
   /**
-   * Skips the requested number of bytes or fail if there are not enough left.
-   * This allows for the possibility that {@link InputStream#skip(long)} may not
-   * skip as many bytes as requested (most likely because of reaching EOF).
+   * Skip the requested number of bytes or fail if there are no enough bytes
+   * left. This allows for the possibility that {@link InputStream#skip(long)}
+   * may not skip as many bytes as requested (most likely because of reaching
+   * EOF).
+   *
    * @param is the input stream to skip.
    * @param n the number of bytes to skip.
    * @throws IOException thrown when skipped less number of bytes.
@@ -179,12 +95,69 @@ final public class AliyunOSSUtils {
    * Calculate a proper size of multipart piece. If <code>minPartSize</code>
    * is too small, the number of multipart pieces may exceed the limit of
    * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}.
+   *
    * @param contentLength the size of file.
    * @param minPartSize the minimum size of multipart piece.
    * @return a revisional size of multipart piece.
-     */
+   */
   public static long calculatePartSize(long contentLength, long minPartSize) {
     long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1;
     return Math.max(minPartSize, tmpPartSize);
   }
+
+  /**
+   * Create credential provider specified by configuration, or create default
+   * credential provider if not specified.
+   *
+   * @param name the uri of the file system
+   * @param conf configuration
+   * @return a credential provider
+   * @throws IOException on any problem. Class construction issues may be
+   * nested inside the IOE.
+   */
+  public static CredentialsProvider getCredentialsProvider(URI name,
+      Configuration conf) throws IOException {
+    URI uri = java.net.URI.create(
+        name.getScheme() + "://" + name.getAuthority());
+    CredentialsProvider credentials;
+
+    String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
+    if (StringUtils.isEmpty(className)) {
+      Configuration newConf =
+          ProviderUtils.excludeIncompatibleCredentialProviders(conf,
+              AliyunOSSFileSystem.class);
+      String accessKey =
+          AliyunOSSUtils.getPassword(newConf, ACCESS_KEY);
+      String secretKey =
+          AliyunOSSUtils.getPassword(newConf, SECRET_KEY);
+      credentials = new DefaultCredentialProvider(
+          new DefaultCredentials(accessKey, secretKey));
+    } else {
+      try {
+        LOG.debug("Credential provider class is:" + className);
+        Class<?> credClass = Class.forName(className);
+        try {
+          credentials =
+              (CredentialsProvider)credClass.getDeclaredConstructor(
+                  URI.class, Configuration.class).newInstance(uri, conf);
+        } catch (NoSuchMethodException | SecurityException e) {
+          credentials =
+              (CredentialsProvider)credClass.getDeclaredConstructor()
+              .newInstance();
+        }
+      } catch (ClassNotFoundException e) {
+        throw new IOException(className + " not found.", e);
+      } catch (NoSuchMethodException | SecurityException e) {
+        throw new IOException(String.format("%s constructor exception.  A " +
+            "class specified in %s must provide an accessible constructor " +
+            "accepting URI and Configuration, or an accessible default " +
+            "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY),
+            e);
+      } catch (ReflectiveOperationException | IllegalArgumentException e) {
+        throw new IOException(className + " instantiation exception.", e);
+      }
+    }
+
+    return credentials;
+  }
 }

+ 2 - 1
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java

@@ -72,7 +72,7 @@ public final class Constants {
 
   // Number of records to get while paging through a directory listing
   public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum";
-  public static final int MAX_PAGING_KEYS_DEFAULT = 500;
+  public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
 
   // Size of each of or multipart pieces in bytes
   public static final String MULTIPART_UPLOAD_SIZE_KEY =
@@ -109,5 +109,6 @@ public final class Constants {
   public static final String FS_OSS = "oss";
 
   public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
+  public static final int MAX_RETRIES = 10;
 
 }

+ 0 - 10
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemContract.java

@@ -73,11 +73,6 @@ public class TestOSSFileSystemContract extends FileSystemContractBaseTest {
     // not supported
   }
 
-  /**
-   * Assert that root directory renames are not allowed.
-   *
-   * @throws Exception on failures
-   */
   @Override
   public void testRootDirAlwaysExists() throws Exception {
     //this will throw an exception if the path is not found
@@ -88,11 +83,6 @@ public class TestOSSFileSystemContract extends FileSystemContractBaseTest {
         fs.exists(super.path("/")));
   }
 
-  /**
-   * Assert that root directory renames are not allowed.
-   *
-   * @throws Exception on failures
-   */
   @Override
   public void testRenameRootDirForbidden() throws Exception {
     if (!renameSupported()) {

+ 121 - 0
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSFileSystemStore.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.net.URI;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeNotNull;
+
+/**
+ * Test the bridging logic between Hadoop's abstract filesystem and
+ * Aliyun OSS.
+ */
+public class TestOSSFileSystemStore {
+  private Configuration conf;
+  private AliyunOSSFileSystemStore store;
+  private AliyunOSSFileSystem fs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    fs = new AliyunOSSFileSystem();
+    fs.initialize(URI.create(conf.get("test.fs.oss.name")), conf);
+    store = fs.getStore();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      store.purge("test");
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  @BeforeClass
+  public static void checkSettings() throws Exception {
+    Configuration conf = new Configuration();
+    assumeNotNull(conf.get("fs.oss.accessKeyId"));
+    assumeNotNull(conf.get("fs.oss.accessKeySecret"));
+    assumeNotNull(conf.get("test.fs.oss.name"));
+  }
+
+  protected void writeRenameReadCompare(Path path, long len)
+      throws IOException, NoSuchAlgorithmException {
+    // If len > fs.oss.multipart.upload.threshold,
+    // we'll use a multipart upload copy
+    MessageDigest digest = MessageDigest.getInstance("MD5");
+    OutputStream out = new BufferedOutputStream(
+        new DigestOutputStream(fs.create(path, false), digest));
+    for (long i = 0; i < len; i++) {
+      out.write('Q');
+    }
+    out.flush();
+    out.close();
+
+    assertTrue("Exists", fs.exists(path));
+
+    Path copyPath = path.suffix(".copy");
+    fs.rename(path, copyPath);
+
+    assertTrue("Copy exists", fs.exists(copyPath));
+
+    // Download file from Aliyun OSS and compare the digest against the original
+    MessageDigest digest2 = MessageDigest.getInstance("MD5");
+    InputStream in = new BufferedInputStream(
+        new DigestInputStream(fs.open(copyPath), digest2));
+    long copyLen = 0;
+    while (in.read() != -1) {
+      copyLen++;
+    }
+    in.close();
+
+    assertEquals("Copy length matches original", len, copyLen);
+    assertArrayEquals("Digests match", digest.digest(), digest2.digest());
+  }
+
+  @Test
+  public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
+    // Regular upload, regular copy
+    writeRenameReadCompare(new Path("/test/small"), 16384);
+  }
+
+  @Test
+  public void testLargeUpload()
+      throws IOException, NoSuchAlgorithmException {
+    // Multipart upload, multipart copy
+    writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
+  }
+}

+ 0 - 1
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/OSSContract.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.aliyun.oss.contract;
 
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.aliyun.oss.OSSTestUtils;

+ 44 - 0
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractDispCp.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Contract test suite covering Aliyun OSS integration with DistCp.
+ */
+public class TestOSSContractDispCp extends AbstractContractDistCpTest {
+
+  private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration newConf = super.createConfiguration();
+    newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
+    newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
+    return newConf;
+  }
+
+  @Override
+  protected OSSContract createContract(Configuration conf) {
+    return new OSSContract(conf);
+  }
+}

+ 35 - 0
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractGetFileStatus.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test getFileStatus and related listing operations.
+ */
+public class TestOSSContractGetFileStatus
+    extends AbstractContractGetFileStatusTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new OSSContract(conf);
+  }
+
+}

+ 69 - 0
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestOSSContractRootDir.java

@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Root dir operations against an Aliyun OSS bucket.
+ */
+public class TestOSSContractRootDir extends
+    AbstractContractRootDirectoryTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestOSSContractRootDir.class);
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new OSSContract(conf);
+  }
+
+  @Override
+  public void testListEmptyRootDirectory() throws IOException {
+    for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) {
+      try {
+        super.testListEmptyRootDirectory();
+        break;
+      } catch (AssertionError | FileNotFoundException e) {
+        if (attempt < maxAttempts) {
+          LOG.info("Attempt {} of {} for empty root directory test failed.  "
+              + "Attempting retry.", attempt, maxAttempts);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e2) {
+            Thread.currentThread().interrupt();
+            fail("Test interrupted.");
+            break;
+          }
+        } else {
+          LOG.error(
+              "Empty root directory test failed {} attempts.  Failing test.",
+              maxAttempts);
+          throw e;
+        }
+      }
+    }
+  }
+}