|
@@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIE
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
|
|
@@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIR
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
|
@@ -100,6 +100,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
|
|
|
import org.apache.hadoop.fs.Options;
|
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
|
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
|
import org.apache.hadoop.fs.VolumeId;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
@@ -113,13 +114,13 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
|
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
|
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
|
@@ -144,6 +145,7 @@ import org.apache.hadoop.io.EnumSetWritable;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
|
|
import org.apache.hadoop.ipc.Client;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
@@ -451,7 +453,11 @@ public class DFSClient implements java.io.Closeable {
|
|
|
|
|
|
/**
|
|
|
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
|
|
|
- * Exactly one of nameNodeUri or rpcNamenode must be null.
|
|
|
+ * If HA is enabled and a positive value is set for
|
|
|
+ * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
|
|
|
+ * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
|
|
|
+ * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode
|
|
|
+ * must be null.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
|
@@ -475,7 +481,20 @@ public class DFSClient implements java.io.Closeable {
|
|
|
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
|
|
|
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
|
|
|
|
|
|
- if (rpcNamenode != null) {
|
|
|
+ int numResponseToDrop = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
|
|
+ if (numResponseToDrop > 0) {
|
|
|
+ // This case is used for testing.
|
|
|
+ LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
|
|
+ + " is set to " + numResponseToDrop
|
|
|
+ + ", this hacked client will proactively drop responses");
|
|
|
+ NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies
|
|
|
+ .createProxyWithLossyRetryHandler(conf, nameNodeUri,
|
|
|
+ ClientProtocol.class, numResponseToDrop);
|
|
|
+ this.dtService = proxyInfo.getDelegationTokenService();
|
|
|
+ this.namenode = proxyInfo.getProxy();
|
|
|
+ } else if (rpcNamenode != null) {
|
|
|
// This case is used for testing.
|
|
|
Preconditions.checkArgument(nameNodeUri == null);
|
|
|
this.namenode = rpcNamenode;
|
|
@@ -514,7 +533,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
this.defaultWriteCachingStrategy =
|
|
|
new CachingStrategy(writeDropBehind, readahead);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Return the socket addresses to use with each configured
|
|
|
* local interface. Local interfaces may be specified by IP
|