|
@@ -22,8 +22,7 @@ import com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
|
- .ChecksumType;
|
|
|
|
|
|
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
|
|
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
|
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
|
import org.apache.hadoop.hdds.scm.storage.BufferPool;
|
|
import org.apache.hadoop.hdds.scm.storage.BufferPool;
|
|
@@ -52,7 +51,10 @@ import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.ListIterator;
|
|
import java.util.ListIterator;
|
|
|
|
+import java.util.Map;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
+import java.util.function.Function;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Maintaining a list of BlockInputStream. Write based on offset.
|
|
* Maintaining a list of BlockInputStream. Write based on offset.
|
|
@@ -95,7 +97,7 @@ public class KeyOutputStream extends OutputStream {
|
|
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
|
|
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
|
|
private FileEncryptionInfo feInfo;
|
|
private FileEncryptionInfo feInfo;
|
|
private ExcludeList excludeList;
|
|
private ExcludeList excludeList;
|
|
- private final RetryPolicy retryPolicy;
|
|
|
|
|
|
+ private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
|
|
private int retryCount;
|
|
private int retryCount;
|
|
private long offset;
|
|
private long offset;
|
|
/**
|
|
/**
|
|
@@ -121,7 +123,10 @@ public class KeyOutputStream extends OutputStream {
|
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
|
this.bytesPerChecksum = OzoneConfigKeys
|
|
this.bytesPerChecksum = OzoneConfigKeys
|
|
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
|
|
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
|
|
- this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
|
|
|
|
|
|
+ this.retryPolicyMap = OzoneClientUtils.getExceptionList()
|
|
|
|
+ .stream()
|
|
|
|
+ .collect(Collectors.toMap(Function.identity(),
|
|
|
|
+ e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
|
|
retryCount = 0;
|
|
retryCount = 0;
|
|
offset = 0;
|
|
offset = 0;
|
|
}
|
|
}
|
|
@@ -200,8 +205,8 @@ public class KeyOutputStream extends OutputStream {
|
|
this.bufferPool =
|
|
this.bufferPool =
|
|
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
|
|
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
|
|
this.excludeList = new ExcludeList();
|
|
this.excludeList = new ExcludeList();
|
|
- this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount,
|
|
|
|
- retryInterval);
|
|
|
|
|
|
+ this.retryPolicyMap = OzoneClientUtils.getRetryPolicyByException(
|
|
|
|
+ maxRetryCount, retryInterval);
|
|
this.retryCount = 0;
|
|
this.retryCount = 0;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -502,10 +507,14 @@ public class KeyOutputStream extends OutputStream {
|
|
}
|
|
}
|
|
|
|
|
|
private void handleRetry(IOException exception, long len) throws IOException {
|
|
private void handleRetry(IOException exception, long len) throws IOException {
|
|
|
|
+ RetryPolicy retryPolicy =
|
|
|
|
+ retryPolicyMap.get(checkForException(exception).getClass());
|
|
|
|
+ if (retryPolicy == null) {
|
|
|
|
+ retryPolicy = retryPolicyMap.get(Exception.class);
|
|
|
|
+ }
|
|
RetryPolicy.RetryAction action;
|
|
RetryPolicy.RetryAction action;
|
|
try {
|
|
try {
|
|
- action = retryPolicy
|
|
|
|
- .shouldRetry(exception, retryCount, 0, true);
|
|
|
|
|
|
+ action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
throw e instanceof IOException ? (IOException) e : new IOException(e);
|
|
throw e instanceof IOException ? (IOException) e : new IOException(e);
|
|
}
|
|
}
|