|
@@ -36,8 +36,8 @@ import com.aliyun.oss.model.GetObjectRequest;
|
|
|
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
|
|
|
import com.aliyun.oss.model.InitiateMultipartUploadResult;
|
|
|
import com.aliyun.oss.model.ListObjectsRequest;
|
|
|
+import com.aliyun.oss.model.ListObjectsV2Request;
|
|
|
import com.aliyun.oss.model.ObjectMetadata;
|
|
|
-import com.aliyun.oss.model.ObjectListing;
|
|
|
import com.aliyun.oss.model.OSSObjectSummary;
|
|
|
import com.aliyun.oss.model.PartETag;
|
|
|
import com.aliyun.oss.model.PutObjectResult;
|
|
@@ -90,6 +90,7 @@ public class AliyunOSSFileSystemStore {
|
|
|
private long uploadPartSize;
|
|
|
private int maxKeys;
|
|
|
private String serverSideEncryptionAlgorithm;
|
|
|
+ private boolean useListV1;
|
|
|
|
|
|
public void initialize(URI uri, Configuration conf, String user,
|
|
|
FileSystem.Statistics stat) throws IOException {
|
|
@@ -170,6 +171,12 @@ public class AliyunOSSFileSystemStore {
|
|
|
}
|
|
|
|
|
|
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
|
|
+ int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
|
|
|
+ if (listVersion < 1 || listVersion > 2) {
|
|
|
+ LOG.warn("Configured fs.oss.list.version {} is invalid, forcing " +
|
|
|
+ "version 2", listVersion);
|
|
|
+ }
|
|
|
+ useListV1 = (listVersion == 1);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -231,14 +238,10 @@ public class AliyunOSSFileSystemStore {
|
|
|
* @throws IOException if failed to delete directory.
|
|
|
*/
|
|
|
public void deleteDirs(String key) throws IOException {
|
|
|
- key = AliyunOSSUtils.maybeAddTrailingSlash(key);
|
|
|
- ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
|
- listRequest.setPrefix(key);
|
|
|
- listRequest.setDelimiter(null);
|
|
|
- listRequest.setMaxKeys(maxKeys);
|
|
|
-
|
|
|
+ OSSListRequest listRequest = createListObjectsRequest(key,
|
|
|
+ maxKeys, null, null, true);
|
|
|
while (true) {
|
|
|
- ObjectListing objects = ossClient.listObjects(listRequest);
|
|
|
+ OSSListResult objects = listObjects(listRequest);
|
|
|
statistics.incrementReadOps(1);
|
|
|
List<String> keysToDelete = new ArrayList<String>();
|
|
|
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
|
|
@@ -246,7 +249,12 @@ public class AliyunOSSFileSystemStore {
|
|
|
}
|
|
|
deleteObjects(keysToDelete);
|
|
|
if (objects.isTruncated()) {
|
|
|
- listRequest.setMarker(objects.getNextMarker());
|
|
|
+ if (objects.isV1()) {
|
|
|
+ listRequest.getV1().setMarker(objects.getV1().getNextMarker());
|
|
|
+ } else {
|
|
|
+ listRequest.getV2().setContinuationToken(
|
|
|
+ objects.getV2().getNextContinuationToken());
|
|
|
+ }
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
@@ -418,25 +426,76 @@ public class AliyunOSSFileSystemStore {
|
|
|
/**
|
|
|
* list objects.
|
|
|
*
|
|
|
+ * @param listRequest list request.
|
|
|
+ * @return a list of matches.
|
|
|
+ */
|
|
|
+ public OSSListResult listObjects(OSSListRequest listRequest) {
|
|
|
+ OSSListResult listResult;
|
|
|
+ if (listRequest.isV1()) {
|
|
|
+ listResult = OSSListResult.v1(
|
|
|
+ ossClient.listObjects(listRequest.getV1()));
|
|
|
+ } else {
|
|
|
+ listResult = OSSListResult.v2(
|
|
|
+ ossClient.listObjectsV2(listRequest.getV2()));
|
|
|
+ }
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
+ return listResult;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * continue to list objects depends on previous list result.
|
|
|
+ *
|
|
|
+ * @param listRequest list request.
|
|
|
+ * @param preListResult previous list result.
|
|
|
+ * @return a list of matches.
|
|
|
+ */
|
|
|
+ public OSSListResult continueListObjects(OSSListRequest listRequest,
|
|
|
+ OSSListResult preListResult) {
|
|
|
+ OSSListResult listResult;
|
|
|
+ if (listRequest.isV1()) {
|
|
|
+ listRequest.getV1().setMarker(preListResult.getV1().getNextMarker());
|
|
|
+ listResult = OSSListResult.v1(
|
|
|
+ ossClient.listObjects(listRequest.getV1()));
|
|
|
+ } else {
|
|
|
+ listRequest.getV2().setContinuationToken(
|
|
|
+ preListResult.getV2().getNextContinuationToken());
|
|
|
+ listResult = OSSListResult.v2(
|
|
|
+ ossClient.listObjectsV2(listRequest.getV2()));
|
|
|
+ }
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
+ return listResult;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * create list objects request.
|
|
|
+ *
|
|
|
* @param prefix prefix.
|
|
|
* @param maxListingLength max no. of entries
|
|
|
* @param marker last key in any previous search.
|
|
|
+ * @param continuationToken list from a specific point.
|
|
|
* @param recursive whether to list directory recursively.
|
|
|
* @return a list of matches.
|
|
|
*/
|
|
|
- public ObjectListing listObjects(String prefix, int maxListingLength,
|
|
|
- String marker, boolean recursive) {
|
|
|
+ protected OSSListRequest createListObjectsRequest(String prefix,
|
|
|
+ int maxListingLength, String marker,
|
|
|
+ String continuationToken, boolean recursive) {
|
|
|
String delimiter = recursive ? null : "/";
|
|
|
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
|
|
|
- ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
|
- listRequest.setPrefix(prefix);
|
|
|
- listRequest.setDelimiter(delimiter);
|
|
|
- listRequest.setMaxKeys(maxListingLength);
|
|
|
- listRequest.setMarker(marker);
|
|
|
-
|
|
|
- ObjectListing listing = ossClient.listObjects(listRequest);
|
|
|
- statistics.incrementReadOps(1);
|
|
|
- return listing;
|
|
|
+ if (useListV1) {
|
|
|
+ ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
|
+ listRequest.setPrefix(prefix);
|
|
|
+ listRequest.setDelimiter(delimiter);
|
|
|
+ listRequest.setMaxKeys(maxListingLength);
|
|
|
+ listRequest.setMarker(marker);
|
|
|
+ return OSSListRequest.v1(listRequest);
|
|
|
+ } else {
|
|
|
+ ListObjectsV2Request listV2Request = new ListObjectsV2Request(bucketName);
|
|
|
+ listV2Request.setPrefix(prefix);
|
|
|
+ listV2Request.setDelimiter(delimiter);
|
|
|
+ listV2Request.setMaxKeys(maxListingLength);
|
|
|
+ listV2Request.setContinuationToken(continuationToken);
|
|
|
+ return OSSListRequest.v2(listV2Request);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -478,21 +537,7 @@ public class AliyunOSSFileSystemStore {
|
|
|
* @throws IOException if failed to clean up objects.
|
|
|
*/
|
|
|
public void purge(String prefix) throws IOException {
|
|
|
- String key;
|
|
|
- try {
|
|
|
- ObjectListing objects = listObjects(prefix, maxKeys, null, true);
|
|
|
- for (OSSObjectSummary object : objects.getObjectSummaries()) {
|
|
|
- key = object.getKey();
|
|
|
- ossClient.deleteObject(bucketName, key);
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
- }
|
|
|
-
|
|
|
- for (String dir: objects.getCommonPrefixes()) {
|
|
|
- deleteDirs(dir);
|
|
|
- }
|
|
|
- } catch (OSSException | ClientException e) {
|
|
|
- LOG.error("Failed to purge " + prefix);
|
|
|
- }
|
|
|
+ deleteDirs(prefix);
|
|
|
}
|
|
|
|
|
|
public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
|
|
@@ -520,12 +565,12 @@ public class AliyunOSSFileSystemStore {
|
|
|
|
|
|
public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
|
|
|
final String prefix, final int maxListingLength, FileSystem fs,
|
|
|
- PathFilter filter, FileStatusAcceptor acceptor, String delimiter) {
|
|
|
+ PathFilter filter, FileStatusAcceptor acceptor, boolean recursive) {
|
|
|
return new RemoteIterator<LocatedFileStatus>() {
|
|
|
- private String nextMarker = null;
|
|
|
private boolean firstListing = true;
|
|
|
private boolean meetEnd = false;
|
|
|
private ListIterator<FileStatus> batchIterator;
|
|
|
+ private OSSListRequest listRequest = null;
|
|
|
|
|
|
@Override
|
|
|
public boolean hasNext() throws IOException {
|
|
@@ -550,15 +595,24 @@ public class AliyunOSSFileSystemStore {
|
|
|
}
|
|
|
|
|
|
private boolean requestNextBatch() {
|
|
|
+ while (!meetEnd) {
|
|
|
+ if (continueListStatus()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean continueListStatus() {
|
|
|
if (meetEnd) {
|
|
|
return false;
|
|
|
}
|
|
|
- ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
|
- listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix));
|
|
|
- listRequest.setMaxKeys(maxListingLength);
|
|
|
- listRequest.setMarker(nextMarker);
|
|
|
- listRequest.setDelimiter(delimiter);
|
|
|
- ObjectListing listing = ossClient.listObjects(listRequest);
|
|
|
+ if (listRequest == null) {
|
|
|
+ listRequest = createListObjectsRequest(prefix,
|
|
|
+ maxListingLength, null, null, recursive);
|
|
|
+ }
|
|
|
+ OSSListResult listing = listObjects(listRequest);
|
|
|
List<FileStatus> stats = new ArrayList<>(
|
|
|
listing.getObjectSummaries().size() +
|
|
|
listing.getCommonPrefixes().size());
|
|
@@ -584,7 +638,12 @@ public class AliyunOSSFileSystemStore {
|
|
|
|
|
|
batchIterator = stats.listIterator();
|
|
|
if (listing.isTruncated()) {
|
|
|
- nextMarker = listing.getNextMarker();
|
|
|
+ if (listing.isV1()) {
|
|
|
+ listRequest.getV1().setMarker(listing.getV1().getNextMarker());
|
|
|
+ } else {
|
|
|
+ listRequest.getV2().setContinuationToken(
|
|
|
+ listing.getV2().getNextContinuationToken());
|
|
|
+ }
|
|
|
} else {
|
|
|
meetEnd = true;
|
|
|
}
|