|
@@ -18,48 +18,11 @@
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
|
|
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
@@ -109,7 +72,6 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.BlockStorageLocation;
|
|
|
import org.apache.hadoop.fs.CacheFlag;
|
|
|
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.ContentSummary;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
@@ -136,9 +98,9 @@ import org.apache.hadoop.fs.permission.AclEntry;
|
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
|
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
|
|
import org.apache.hadoop.hdfs.net.Peer;
|
|
|
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
|
|
import org.apache.hadoop.hdfs.protocol.AclException;
|
|
@@ -195,14 +157,12 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
|
|
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.io.EnumSetWritable;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
|
|
-import org.apache.hadoop.ipc.Client;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.net.DNS;
|
|
@@ -250,7 +210,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
|
|
|
|
private final Configuration conf;
|
|
|
- private final Conf dfsClientConf;
|
|
|
+ private final DfsClientConf dfsClientConf;
|
|
|
final ClientProtocol namenode;
|
|
|
/* The service used for delegation tokens */
|
|
|
private Text dtService;
|
|
@@ -278,307 +238,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
|
|
|
private final Sampler<?> traceSampler;
|
|
|
|
|
|
- /**
|
|
|
- * DFSClient configuration
|
|
|
- */
|
|
|
- public static class Conf {
|
|
|
- final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
-
|
|
|
- final int maxFailoverAttempts;
|
|
|
- final int maxRetryAttempts;
|
|
|
- final int failoverSleepBaseMillis;
|
|
|
- final int failoverSleepMaxMillis;
|
|
|
- final int maxBlockAcquireFailures;
|
|
|
- final int confTime;
|
|
|
- final int ioBufferSize;
|
|
|
- final ChecksumOpt defaultChecksumOpt;
|
|
|
- final int writePacketSize;
|
|
|
- final int writeMaxPackets;
|
|
|
- final ByteArrayManager.Conf writeByteArrayManagerConf;
|
|
|
- final int socketTimeout;
|
|
|
- final int socketCacheCapacity;
|
|
|
- final long socketCacheExpiry;
|
|
|
- final long excludedNodesCacheExpiry;
|
|
|
- /** Wait time window (in msec) if BlockMissingException is caught */
|
|
|
- final int timeWindow;
|
|
|
- final int nCachedConnRetry;
|
|
|
- final int nBlockWriteRetry;
|
|
|
- final int nBlockWriteLocateFollowingRetry;
|
|
|
- final int blockWriteLocateFollowingInitialDelayMs;
|
|
|
- final long defaultBlockSize;
|
|
|
- final long prefetchSize;
|
|
|
- final short defaultReplication;
|
|
|
- final String taskId;
|
|
|
- final FsPermission uMask;
|
|
|
- final boolean connectToDnViaHostname;
|
|
|
- final boolean getHdfsBlocksMetadataEnabled;
|
|
|
- final int getFileBlockStorageLocationsNumThreads;
|
|
|
- final int getFileBlockStorageLocationsTimeoutMs;
|
|
|
- final int retryTimesForGetLastBlockLength;
|
|
|
- final int retryIntervalForGetLastBlockLength;
|
|
|
- final long datanodeRestartTimeout;
|
|
|
- final long dfsclientSlowIoWarningThresholdMs;
|
|
|
-
|
|
|
- final boolean useLegacyBlockReader;
|
|
|
- final boolean useLegacyBlockReaderLocal;
|
|
|
- final String domainSocketPath;
|
|
|
- final boolean skipShortCircuitChecksums;
|
|
|
- final int shortCircuitBufferSize;
|
|
|
- final boolean shortCircuitLocalReads;
|
|
|
- final boolean domainSocketDataTraffic;
|
|
|
- final int shortCircuitStreamsCacheSize;
|
|
|
- final long shortCircuitStreamsCacheExpiryMs;
|
|
|
- final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
|
|
|
-
|
|
|
- final boolean shortCircuitMmapEnabled;
|
|
|
- final int shortCircuitMmapCacheSize;
|
|
|
- final long shortCircuitMmapCacheExpiryMs;
|
|
|
- final long shortCircuitMmapCacheRetryTimeout;
|
|
|
- final long shortCircuitCacheStaleThresholdMs;
|
|
|
-
|
|
|
- final long keyProviderCacheExpiryMs;
|
|
|
- public BlockReaderFactory.FailureInjector brfFailureInjector =
|
|
|
- new BlockReaderFactory.FailureInjector();
|
|
|
-
|
|
|
- public Conf(Configuration conf) {
|
|
|
- // The hdfsTimeout is currently the same as the ipc timeout
|
|
|
- hdfsTimeout = Client.getTimeout(conf);
|
|
|
- maxFailoverAttempts = conf.getInt(
|
|
|
- DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
|
|
- DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
|
|
- maxRetryAttempts = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
|
|
|
- failoverSleepBaseMillis = conf.getInt(
|
|
|
- DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
|
|
|
- DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
|
|
|
- failoverSleepMaxMillis = conf.getInt(
|
|
|
- DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
|
|
|
- DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
|
|
|
-
|
|
|
- maxBlockAcquireFailures = conf.getInt(
|
|
|
- DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
|
|
|
- DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
|
|
|
- confTime = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
|
|
- HdfsServerConstants.WRITE_TIMEOUT);
|
|
|
- ioBufferSize = conf.getInt(
|
|
|
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
|
|
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
|
|
- defaultChecksumOpt = getChecksumOptFromConf(conf);
|
|
|
- socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
|
|
- HdfsServerConstants.READ_TIMEOUT);
|
|
|
- /** dfs.write.packet.size is an internal config variable */
|
|
|
- writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
|
- DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
|
- writeMaxPackets = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
|
|
-
|
|
|
- final boolean byteArrayManagerEnabled = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_ENABLED_DEFAULT);
|
|
|
- if (!byteArrayManagerEnabled) {
|
|
|
- writeByteArrayManagerConf = null;
|
|
|
- } else {
|
|
|
- final int countThreshold = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_THRESHOLD_DEFAULT);
|
|
|
- final int countLimit = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_LIMIT_DEFAULT);
|
|
|
- final long countResetTimePeriodMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_BYTE_ARRAY_MANAGER_COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
|
|
- writeByteArrayManagerConf = new ByteArrayManager.Conf(
|
|
|
- countThreshold, countLimit, countResetTimePeriodMs);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
|
|
- DFS_BLOCK_SIZE_DEFAULT);
|
|
|
- defaultReplication = (short) conf.getInt(
|
|
|
- DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
|
|
|
- taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
|
|
|
- socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
|
|
|
- DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
|
|
|
- socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
|
|
|
- DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
|
|
|
- excludedNodesCacheExpiry = conf.getLong(
|
|
|
- DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL,
|
|
|
- DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
|
|
- prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
|
|
- 10 * defaultBlockSize);
|
|
|
- timeWindow = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
|
|
|
- nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
|
|
- DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
|
|
- nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
|
|
|
- DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
|
|
|
- nBlockWriteLocateFollowingRetry = conf.getInt(
|
|
|
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
|
|
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
|
|
- blockWriteLocateFollowingInitialDelayMs = conf.getInt(
|
|
|
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
|
|
|
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
|
|
|
- uMask = FsPermission.getUMask(conf);
|
|
|
- connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
|
|
- DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
|
|
- getHdfsBlocksMetadataEnabled = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
|
|
|
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
|
|
|
- getFileBlockStorageLocationsNumThreads = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
|
|
|
- getFileBlockStorageLocationsTimeoutMs = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
|
|
|
- retryTimesForGetLastBlockLength = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
|
|
|
- retryIntervalForGetLastBlockLength = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
|
|
|
-
|
|
|
- useLegacyBlockReader = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
|
|
|
- useLegacyBlockReaderLocal = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
|
|
- shortCircuitLocalReads = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
|
|
|
- domainSocketDataTraffic = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
|
|
- domainSocketPath = conf.getTrimmed(
|
|
|
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
|
|
|
-
|
|
|
- if (BlockReaderLocal.LOG.isDebugEnabled()) {
|
|
|
- BlockReaderLocal.LOG.debug(
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
|
|
|
- + " = " + useLegacyBlockReaderLocal);
|
|
|
- BlockReaderLocal.LOG.debug(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
|
|
|
- + " = " + shortCircuitLocalReads);
|
|
|
- BlockReaderLocal.LOG.debug(
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
|
|
|
- + " = " + domainSocketDataTraffic);
|
|
|
- BlockReaderLocal.LOG.debug(
|
|
|
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
|
|
|
- + " = " + domainSocketPath);
|
|
|
- }
|
|
|
-
|
|
|
- skipShortCircuitChecksums = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
|
|
|
- shortCircuitBufferSize = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
|
|
|
- shortCircuitStreamsCacheSize = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
|
|
|
- shortCircuitStreamsCacheExpiryMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
|
|
|
- shortCircuitMmapEnabled = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED,
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED_DEFAULT);
|
|
|
- shortCircuitMmapCacheSize = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE,
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
|
|
|
- shortCircuitMmapCacheExpiryMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
|
|
|
- shortCircuitMmapCacheRetryTimeout = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT);
|
|
|
- shortCircuitCacheStaleThresholdMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
|
|
|
- shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
|
|
- DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
|
|
|
-
|
|
|
- datanodeRestartTimeout = conf.getLong(
|
|
|
- DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
|
|
|
- DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
|
|
|
- dfsclientSlowIoWarningThresholdMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
|
|
-
|
|
|
- keyProviderCacheExpiryMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isUseLegacyBlockReaderLocal() {
|
|
|
- return useLegacyBlockReaderLocal;
|
|
|
- }
|
|
|
-
|
|
|
- public String getDomainSocketPath() {
|
|
|
- return domainSocketPath;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isShortCircuitLocalReads() {
|
|
|
- return shortCircuitLocalReads;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isDomainSocketDataTraffic() {
|
|
|
- return domainSocketDataTraffic;
|
|
|
- }
|
|
|
-
|
|
|
- private DataChecksum.Type getChecksumType(Configuration conf) {
|
|
|
- final String checksum = conf.get(
|
|
|
- DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
|
|
|
- DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
- try {
|
|
|
- return DataChecksum.Type.valueOf(checksum);
|
|
|
- } catch(IllegalArgumentException iae) {
|
|
|
- LOG.warn("Bad checksum type: " + checksum + ". Using default "
|
|
|
- + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
- return DataChecksum.Type.valueOf(
|
|
|
- DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Construct a checksum option from conf
|
|
|
- private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
|
|
|
- DataChecksum.Type type = getChecksumType(conf);
|
|
|
- int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
|
|
|
- DFS_BYTES_PER_CHECKSUM_DEFAULT);
|
|
|
- return new ChecksumOpt(type, bytesPerChecksum);
|
|
|
- }
|
|
|
-
|
|
|
- // create a DataChecksum with the default option.
|
|
|
- private DataChecksum createChecksum() throws IOException {
|
|
|
- return createChecksum(null);
|
|
|
- }
|
|
|
-
|
|
|
- private DataChecksum createChecksum(ChecksumOpt userOpt) {
|
|
|
- // Fill in any missing field with the default.
|
|
|
- ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
|
|
|
- defaultChecksumOpt, userOpt);
|
|
|
- DataChecksum dataChecksum = DataChecksum.newDataChecksum(
|
|
|
- myOpt.getChecksumType(),
|
|
|
- myOpt.getBytesPerChecksum());
|
|
|
- if (dataChecksum == null) {
|
|
|
- throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
|
|
|
- + userOpt + ", default=" + defaultChecksumOpt
|
|
|
- + ", effective=null");
|
|
|
- }
|
|
|
- return dataChecksum;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- public int getBlockWriteLocateFollowingInitialDelayMs() {
|
|
|
- return blockWriteLocateFollowingInitialDelayMs;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public Conf getConf() {
|
|
|
+ public DfsClientConf getConf() {
|
|
|
return dfsClientConf;
|
|
|
}
|
|
|
|
|
@@ -642,10 +302,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
SpanReceiverHost.getInstance(conf);
|
|
|
traceSampler = new SamplerBuilder(TraceUtils.wrapHadoopConf(conf)).build();
|
|
|
// Copy only the required DFSClient configuration
|
|
|
- this.dfsClientConf = new Conf(conf);
|
|
|
- if (this.dfsClientConf.useLegacyBlockReaderLocal) {
|
|
|
- LOG.debug("Using legacy short-circuit local reads.");
|
|
|
- }
|
|
|
+ this.dfsClientConf = new DfsClientConf(conf);
|
|
|
this.conf = conf;
|
|
|
this.stats = stats;
|
|
|
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
|
|
@@ -654,7 +311,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
|
|
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
|
|
|
- this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
|
|
|
+ this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
|
|
|
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
|
|
|
int numResponseToDrop = conf.getInt(
|
|
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
|
@@ -778,31 +435,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
return addr;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Return the number of times the client should go back to the namenode
|
|
|
- * to retrieve block locations when reading.
|
|
|
- */
|
|
|
- int getMaxBlockAcquireFailures() {
|
|
|
- return dfsClientConf.maxBlockAcquireFailures;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Return the timeout that clients should use when writing to datanodes.
|
|
|
* @param numNodes the number of nodes in the pipeline.
|
|
|
*/
|
|
|
int getDatanodeWriteTimeout(int numNodes) {
|
|
|
- return (dfsClientConf.confTime > 0) ?
|
|
|
- (dfsClientConf.confTime + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
|
|
|
+ final int t = dfsClientConf.getDatanodeSocketWriteTimeout();
|
|
|
+ return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0;
|
|
|
}
|
|
|
|
|
|
int getDatanodeReadTimeout(int numNodes) {
|
|
|
- return dfsClientConf.socketTimeout > 0 ?
|
|
|
- (HdfsServerConstants.READ_TIMEOUT_EXTENSION * numNodes +
|
|
|
- dfsClientConf.socketTimeout) : 0;
|
|
|
- }
|
|
|
-
|
|
|
- int getHdfsTimeout() {
|
|
|
- return dfsClientConf.hdfsTimeout;
|
|
|
+ final int t = dfsClientConf.getSocketTimeout();
|
|
|
+ return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0;
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -991,14 +635,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Get the default block size for this cluster
|
|
|
- * @return the default block size in bytes
|
|
|
- */
|
|
|
- public long getDefaultBlockSize() {
|
|
|
- return dfsClientConf.defaultBlockSize;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* @see ClientProtocol#getPreferredBlockSize(String)
|
|
|
*/
|
|
@@ -1211,13 +847,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
namenode.reportBadBlocks(blocks);
|
|
|
}
|
|
|
|
|
|
- public short getDefaultReplication() {
|
|
|
- return dfsClientConf.defaultReplication;
|
|
|
- }
|
|
|
-
|
|
|
public LocatedBlocks getLocatedBlocks(String src, long start)
|
|
|
throws IOException {
|
|
|
- return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
|
|
|
+ return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -1319,7 +951,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
public BlockStorageLocation[] getBlockStorageLocations(
|
|
|
List<BlockLocation> blockLocations) throws IOException,
|
|
|
UnsupportedOperationException, InvalidBlockTokenException {
|
|
|
- if (!getConf().getHdfsBlocksMetadataEnabled) {
|
|
|
+ if (!getConf().isHdfsBlocksMetadataEnabled()) {
|
|
|
throw new UnsupportedOperationException("Datanode-side support for " +
|
|
|
"getVolumeBlockLocations() must also be enabled in the client " +
|
|
|
"configuration.");
|
|
@@ -1356,9 +988,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
try {
|
|
|
metadatas = BlockStorageLocationUtil.
|
|
|
queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
|
|
|
- getConf().getFileBlockStorageLocationsNumThreads,
|
|
|
- getConf().getFileBlockStorageLocationsTimeoutMs,
|
|
|
- getConf().connectToDnViaHostname);
|
|
|
+ getConf().getFileBlockStorageLocationsNumThreads(),
|
|
|
+ getConf().getFileBlockStorageLocationsTimeoutMs(),
|
|
|
+ getConf().isConnectToDnViaHostname());
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("metadata returned: "
|
|
|
+ Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
|
|
@@ -1512,7 +1144,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
|
|
|
public DFSInputStream open(String src)
|
|
|
throws IOException, UnresolvedLinkException {
|
|
|
- return open(src, dfsClientConf.ioBufferSize, true, null);
|
|
|
+ return open(src, dfsClientConf.getIoBufferSize(), true, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1563,8 +1195,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
*/
|
|
|
public OutputStream create(String src, boolean overwrite)
|
|
|
throws IOException {
|
|
|
- return create(src, overwrite, dfsClientConf.defaultReplication,
|
|
|
- dfsClientConf.defaultBlockSize, null);
|
|
|
+ return create(src, overwrite, dfsClientConf.getDefaultReplication(),
|
|
|
+ dfsClientConf.getDefaultBlockSize(), null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1574,8 +1206,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
public OutputStream create(String src,
|
|
|
boolean overwrite,
|
|
|
Progressable progress) throws IOException {
|
|
|
- return create(src, overwrite, dfsClientConf.defaultReplication,
|
|
|
- dfsClientConf.defaultBlockSize, progress);
|
|
|
+ return create(src, overwrite, dfsClientConf.getDefaultReplication(),
|
|
|
+ dfsClientConf.getDefaultBlockSize(), progress);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1596,7 +1228,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
public OutputStream create(String src, boolean overwrite, short replication,
|
|
|
long blockSize, Progressable progress) throws IOException {
|
|
|
return create(src, overwrite, replication, blockSize, progress,
|
|
|
- dfsClientConf.ioBufferSize);
|
|
|
+ dfsClientConf.getIoBufferSize());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1678,6 +1310,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
progress, buffersize, checksumOpt, null);
|
|
|
}
|
|
|
|
|
|
+ private FsPermission applyUMask(FsPermission permission) {
|
|
|
+ if (permission == null) {
|
|
|
+ permission = FsPermission.getFileDefault();
|
|
|
+ }
|
|
|
+ return permission.applyUMask(dfsClientConf.getUMask());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
|
|
|
* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
|
|
@@ -1698,10 +1337,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
ChecksumOpt checksumOpt,
|
|
|
InetSocketAddress[] favoredNodes) throws IOException {
|
|
|
checkOpen();
|
|
|
- if (permission == null) {
|
|
|
- permission = FsPermission.getFileDefault();
|
|
|
- }
|
|
|
- FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
|
|
|
+ final FsPermission masked = applyUMask(permission);
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug(src + ": masked=" + masked);
|
|
|
}
|
|
@@ -1783,8 +1419,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
throws IOException {
|
|
|
TraceScope scope = getPathTraceScope("createSymlink", target);
|
|
|
try {
|
|
|
- FsPermission dirPerm =
|
|
|
- FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
|
|
|
+ final FsPermission dirPerm = applyUMask(null);
|
|
|
namenode.createSymlink(target, link, dirPerm, createParent);
|
|
|
} catch (RemoteException re) {
|
|
|
throw re.unwrapRemoteException(AccessControlException.class,
|
|
@@ -1828,7 +1463,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
new EnumSetWritable<>(flag, CreateFlag.class));
|
|
|
return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
|
|
|
progress, blkWithStatus.getLastBlock(),
|
|
|
- blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(),
|
|
|
+ blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(null),
|
|
|
favoredNodes);
|
|
|
} catch(RemoteException re) {
|
|
|
throw re.unwrapRemoteException(AccessControlException.class,
|
|
@@ -2253,7 +1888,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
final DatanodeInfo[] datanodes = lb.getLocations();
|
|
|
|
|
|
//try each datanode location of the block
|
|
|
- final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
|
|
|
+ final int timeout = 3000*datanodes.length + dfsClientConf.getSocketTimeout();
|
|
|
boolean done = false;
|
|
|
for(int j = 0; !done && j < datanodes.length; j++) {
|
|
|
DataOutputStream out = null;
|
|
@@ -2391,7 +2026,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
Socket sock = null;
|
|
|
try {
|
|
|
sock = socketFactory.createSocket();
|
|
|
- String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
|
|
|
+ String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Connecting to datanode " + dnAddr);
|
|
|
}
|
|
@@ -2424,7 +2059,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
*/
|
|
|
private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
|
|
|
throws IOException {
|
|
|
- IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
|
|
|
+ IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
|
|
|
|
|
|
try {
|
|
|
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
|
|
@@ -2979,10 +2614,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
*/
|
|
|
public boolean mkdirs(String src, FsPermission permission,
|
|
|
boolean createParent) throws IOException {
|
|
|
- if (permission == null) {
|
|
|
- permission = FsPermission.getDefault();
|
|
|
- }
|
|
|
- FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
|
|
|
+ final FsPermission masked = applyUMask(permission);
|
|
|
return primitiveMkdir(src, masked, createParent);
|
|
|
}
|
|
|
|
|
@@ -3004,8 +2636,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
throws IOException {
|
|
|
checkOpen();
|
|
|
if (absPermission == null) {
|
|
|
- absPermission =
|
|
|
- FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
|
|
|
+ absPermission = applyUMask(null);
|
|
|
}
|
|
|
|
|
|
if(LOG.isDebugEnabled()) {
|
|
@@ -3447,14 +3078,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
Peer peer = null;
|
|
|
boolean success = false;
|
|
|
Socket sock = null;
|
|
|
+ final int socketTimeout = dfsClientConf.getSocketTimeout();
|
|
|
try {
|
|
|
sock = socketFactory.createSocket();
|
|
|
- NetUtils.connect(sock, addr,
|
|
|
- getRandomLocalInterfaceAddr(),
|
|
|
- dfsClientConf.socketTimeout);
|
|
|
+ NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout);
|
|
|
peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
|
|
|
blockToken, datanodeId);
|
|
|
- peer.setReadTimeout(dfsClientConf.socketTimeout);
|
|
|
+ peer.setReadTimeout(socketTimeout);
|
|
|
success = true;
|
|
|
return peer;
|
|
|
} finally {
|