|
@@ -34,6 +34,7 @@ import com.qcloud.cos.COSClient;
|
|
import com.qcloud.cos.ClientConfig;
|
|
import com.qcloud.cos.ClientConfig;
|
|
import com.qcloud.cos.auth.BasicCOSCredentials;
|
|
import com.qcloud.cos.auth.BasicCOSCredentials;
|
|
import com.qcloud.cos.auth.COSCredentials;
|
|
import com.qcloud.cos.auth.COSCredentials;
|
|
|
|
+import com.qcloud.cos.endpoint.SuffixEndpointBuilder;
|
|
import com.qcloud.cos.exception.CosClientException;
|
|
import com.qcloud.cos.exception.CosClientException;
|
|
import com.qcloud.cos.exception.CosServiceException;
|
|
import com.qcloud.cos.exception.CosServiceException;
|
|
import com.qcloud.cos.http.HttpProtocol;
|
|
import com.qcloud.cos.http.HttpProtocol;
|
|
@@ -64,7 +65,7 @@ import org.slf4j.LoggerFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.fs.cosn.auth.COSCredentialProviderList;
|
|
|
|
|
|
+import org.apache.hadoop.fs.cosn.auth.COSCredentialsProviderList;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
import org.apache.http.HttpStatus;
|
|
import org.apache.http.HttpStatus;
|
|
|
|
|
|
@@ -89,9 +90,9 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
* @throws IOException Initialize the COS client failed,
|
|
* @throws IOException Initialize the COS client failed,
|
|
* caused by incorrect options.
|
|
* caused by incorrect options.
|
|
*/
|
|
*/
|
|
- private void initCOSClient(Configuration conf) throws IOException {
|
|
|
|
- COSCredentialProviderList credentialProviderList =
|
|
|
|
- CosNUtils.createCosCredentialsProviderSet(conf);
|
|
|
|
|
|
+ private void initCOSClient(URI uri, Configuration conf) throws IOException {
|
|
|
|
+ COSCredentialsProviderList credentialProviderList =
|
|
|
|
+ CosNUtils.createCosCredentialsProviderSet(uri, conf);
|
|
String region = conf.get(CosNConfigKeys.COSN_REGION_KEY);
|
|
String region = conf.get(CosNConfigKeys.COSN_REGION_KEY);
|
|
String endpointSuffix = conf.get(
|
|
String endpointSuffix = conf.get(
|
|
CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
|
|
CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
|
|
@@ -113,7 +114,7 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
ClientConfig config;
|
|
ClientConfig config;
|
|
if (null == region) {
|
|
if (null == region) {
|
|
config = new ClientConfig(new Region(""));
|
|
config = new ClientConfig(new Region(""));
|
|
- config.setEndPointSuffix(endpointSuffix);
|
|
|
|
|
|
+ config.setEndpointBuilder(new SuffixEndpointBuilder(endpointSuffix));
|
|
} else {
|
|
} else {
|
|
config = new ClientConfig(new Region(region));
|
|
config = new ClientConfig(new Region(region));
|
|
}
|
|
}
|
|
@@ -146,7 +147,7 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
@Override
|
|
@Override
|
|
public void initialize(URI uri, Configuration conf) throws IOException {
|
|
public void initialize(URI uri, Configuration conf) throws IOException {
|
|
try {
|
|
try {
|
|
- initCOSClient(conf);
|
|
|
|
|
|
+ initCOSClient(uri, conf);
|
|
this.bucketName = uri.getHost();
|
|
this.bucketName = uri.getHost();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
handleException(e, "");
|
|
handleException(e, "");
|
|
@@ -174,8 +175,8 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
|
|
|
|
PutObjectResult putObjectResult =
|
|
PutObjectResult putObjectResult =
|
|
(PutObjectResult) callCOSClientWithRetry(putObjectRequest);
|
|
(PutObjectResult) callCOSClientWithRetry(putObjectRequest);
|
|
- LOG.debug("Store file successfully. COS key: [{}], ETag: [{}], "
|
|
|
|
- + "MD5: [{}].", key, putObjectResult.getETag(), new String(md5Hash));
|
|
|
|
|
|
+ LOG.debug("Store file successfully. COS key: [{}], ETag: [{}].",
|
|
|
|
+ key, putObjectResult.getETag());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
String errMsg = String.format("Store file failed. COS key: [%s], "
|
|
String errMsg = String.format("Store file failed. COS key: [%s], "
|
|
+ "exception: [%s]", key, e.toString());
|
|
+ "exception: [%s]", key, e.toString());
|
|
@@ -196,8 +197,7 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
public void storeFile(String key, File file, byte[] md5Hash)
|
|
public void storeFile(String key, File file, byte[] md5Hash)
|
|
throws IOException {
|
|
throws IOException {
|
|
LOG.info("Store file from local path: [{}]. file length: [{}] COS key: " +
|
|
LOG.info("Store file from local path: [{}]. file length: [{}] COS key: " +
|
|
- "[{}] MD5: [{}].", file.getCanonicalPath(), file.length(), key,
|
|
|
|
- new String(md5Hash));
|
|
|
|
|
|
+ "[{}]", file.getCanonicalPath(), file.length(), key);
|
|
storeFileWithRetry(key, new BufferedInputStream(new FileInputStream(file)),
|
|
storeFileWithRetry(key, new BufferedInputStream(new FileInputStream(file)),
|
|
md5Hash, file.length());
|
|
md5Hash, file.length());
|
|
}
|
|
}
|
|
@@ -218,7 +218,7 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
byte[] md5Hash,
|
|
byte[] md5Hash,
|
|
long contentLength) throws IOException {
|
|
long contentLength) throws IOException {
|
|
LOG.info("Store file from input stream. COS key: [{}], "
|
|
LOG.info("Store file from input stream. COS key: [{}], "
|
|
- + "length: [{}], MD5: [{}].", key, contentLength, md5Hash);
|
|
|
|
|
|
+ + "length: [{}].", key, contentLength);
|
|
storeFileWithRetry(key, inputStream, md5Hash, contentLength);
|
|
storeFileWithRetry(key, inputStream, md5Hash, contentLength);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -250,7 +250,11 @@ class CosNativeFileSystemStore implements NativeFileSystemStore {
|
|
public PartETag uploadPart(File file, String key, String uploadId,
|
|
public PartETag uploadPart(File file, String key, String uploadId,
|
|
int partNum) throws IOException {
|
|
int partNum) throws IOException {
|
|
InputStream inputStream = new FileInputStream(file);
|
|
InputStream inputStream = new FileInputStream(file);
|
|
- return uploadPart(inputStream, key, uploadId, partNum, file.length());
|
|
|
|
|
|
+ try {
|
|
|
|
+ return uploadPart(inputStream, key, uploadId, partNum, file.length());
|
|
|
|
+ } finally {
|
|
|
|
+ inputStream.close();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|