|
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.crypto.CipherSuite;
|
|
import org.apache.hadoop.crypto.CipherSuite;
|
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
|
@@ -76,6 +77,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
@@ -127,6 +129,13 @@ public class DFSOutputStream extends FSOutputSummer
|
|
implements Syncable, CanSetDropBehind {
|
|
implements Syncable, CanSetDropBehind {
|
|
private final long dfsclientSlowLogThresholdMs;
|
|
private final long dfsclientSlowLogThresholdMs;
|
|
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
|
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
|
|
|
+ /**
|
|
|
|
+ * Number of times to retry creating a file when there are transient
|
|
|
|
+ * errors (typically related to encryption zones and KeyProvider operations).
|
|
|
|
+ */
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public static final int CREATE_RETRY_COUNT = 10;
|
|
|
|
+
|
|
private final DFSClient dfsClient;
|
|
private final DFSClient dfsClient;
|
|
private Socket s;
|
|
private Socket s;
|
|
// closed is accessed by different threads under different locks.
|
|
// closed is accessed by different threads under different locks.
|
|
@@ -1648,23 +1657,46 @@ public class DFSOutputStream extends FSOutputSummer
|
|
short replication, long blockSize, Progressable progress, int buffersize,
|
|
short replication, long blockSize, Progressable progress, int buffersize,
|
|
DataChecksum checksum, String[] favoredNodes,
|
|
DataChecksum checksum, String[] favoredNodes,
|
|
List<CipherSuite> cipherSuites) throws IOException {
|
|
List<CipherSuite> cipherSuites) throws IOException {
|
|
- final HdfsFileStatus stat;
|
|
|
|
- try {
|
|
|
|
- stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
|
|
|
- new EnumSetWritable<CreateFlag>(flag), createParent, replication,
|
|
|
|
- blockSize, cipherSuites);
|
|
|
|
- } catch(RemoteException re) {
|
|
|
|
- throw re.unwrapRemoteException(AccessControlException.class,
|
|
|
|
- DSQuotaExceededException.class,
|
|
|
|
- FileAlreadyExistsException.class,
|
|
|
|
- FileNotFoundException.class,
|
|
|
|
- ParentNotDirectoryException.class,
|
|
|
|
- NSQuotaExceededException.class,
|
|
|
|
- SafeModeException.class,
|
|
|
|
- UnresolvedPathException.class,
|
|
|
|
- SnapshotAccessControlException.class,
|
|
|
|
- UnknownCipherSuiteException.class);
|
|
|
|
|
|
+ HdfsFileStatus stat = null;
|
|
|
|
+
|
|
|
|
+ // Retry the create if we get a RetryStartFileException up to a maximum
|
|
|
|
+ // number of times
|
|
|
|
+ boolean shouldRetry = true;
|
|
|
|
+ int retryCount = CREATE_RETRY_COUNT;
|
|
|
|
+ while (shouldRetry) {
|
|
|
|
+ shouldRetry = false;
|
|
|
|
+ try {
|
|
|
|
+ stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
|
|
|
+ new EnumSetWritable<CreateFlag>(flag), createParent, replication,
|
|
|
|
+ blockSize, cipherSuites);
|
|
|
|
+ break;
|
|
|
|
+ } catch (RemoteException re) {
|
|
|
|
+ IOException e = re.unwrapRemoteException(
|
|
|
|
+ AccessControlException.class,
|
|
|
|
+ DSQuotaExceededException.class,
|
|
|
|
+ FileAlreadyExistsException.class,
|
|
|
|
+ FileNotFoundException.class,
|
|
|
|
+ ParentNotDirectoryException.class,
|
|
|
|
+ NSQuotaExceededException.class,
|
|
|
|
+ RetryStartFileException.class,
|
|
|
|
+ SafeModeException.class,
|
|
|
|
+ UnresolvedPathException.class,
|
|
|
|
+ SnapshotAccessControlException.class,
|
|
|
|
+ UnknownCipherSuiteException.class);
|
|
|
|
+ if (e instanceof RetryStartFileException) {
|
|
|
|
+ if (retryCount > 0) {
|
|
|
|
+ shouldRetry = true;
|
|
|
|
+ retryCount--;
|
|
|
|
+ } else {
|
|
|
|
+ throw new IOException("Too many retries because of encryption" +
|
|
|
|
+ " zone operations", e);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
|
|
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
|
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
|
flag, progress, checksum, favoredNodes);
|
|
flag, progress, checksum, favoredNodes);
|
|
out.start();
|
|
out.start();
|