|
@@ -17,49 +17,78 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.client.impl;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
|
|
-
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.hdfs.BlockReaderFactory;
|
|
|
-import org.apache.hadoop.hdfs.DFSClient;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
|
|
import org.apache.hadoop.ipc.Client;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
-
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
|
|
|
|
|
|
/**
|
|
|
- * DFSClient configuration
|
|
|
+ * DFSClient configuration.
|
|
|
*/
|
|
|
public class DfsClientConf {
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf
|
|
|
+ .class);
|
|
|
|
|
|
private final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
|
|
@@ -76,7 +105,7 @@ public class DfsClientConf {
|
|
|
private final ByteArrayManager.Conf writeByteArrayManagerConf;
|
|
|
private final int socketTimeout;
|
|
|
private final long excludedNodesCacheExpiry;
|
|
|
- /** Wait time window (in msec) if BlockMissingException is caught */
|
|
|
+ /** Wait time window (in msec) if BlockMissingException is caught. */
|
|
|
private final int timeWindow;
|
|
|
private final int numCachedConnRetry;
|
|
|
private final int numBlockWriteRetry;
|
|
@@ -94,96 +123,97 @@ public class DfsClientConf {
|
|
|
private final long slowIoWarningThresholdMs;
|
|
|
|
|
|
private final ShortCircuitConf shortCircuitConf;
|
|
|
-
|
|
|
+
|
|
|
private final long hedgedReadThresholdMillis;
|
|
|
private final int hedgedReadThreadpoolSize;
|
|
|
|
|
|
public DfsClientConf(Configuration conf) {
|
|
|
- // The hdfsTimeout is currently the same as the ipc timeout
|
|
|
+ // The hdfsTimeout is currently the same as the ipc timeout
|
|
|
hdfsTimeout = Client.getTimeout(conf);
|
|
|
|
|
|
maxRetryAttempts = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
|
|
|
+ Retry.MAX_ATTEMPTS_KEY,
|
|
|
+ Retry.MAX_ATTEMPTS_DEFAULT);
|
|
|
timeWindow = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT);
|
|
|
+ Retry.WINDOW_BASE_KEY,
|
|
|
+ Retry.WINDOW_BASE_DEFAULT);
|
|
|
retryTimesForGetLastBlockLength = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
|
|
|
+ Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
|
|
|
+ Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
|
|
|
retryIntervalForGetLastBlockLength = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
|
|
|
- HdfsClientConfigKeys.Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
|
|
|
+ Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY,
|
|
|
+ Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
|
|
|
|
|
|
maxFailoverAttempts = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
|
|
|
- HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
|
|
|
+ Failover.MAX_ATTEMPTS_KEY,
|
|
|
+ Failover.MAX_ATTEMPTS_DEFAULT);
|
|
|
failoverSleepBaseMillis = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
|
|
|
- HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
|
|
|
+ Failover.SLEEPTIME_BASE_KEY,
|
|
|
+ Failover.SLEEPTIME_BASE_DEFAULT);
|
|
|
failoverSleepMaxMillis = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
|
|
|
- HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
|
|
|
+ Failover.SLEEPTIME_MAX_KEY,
|
|
|
+ Failover.SLEEPTIME_MAX_DEFAULT);
|
|
|
|
|
|
maxBlockAcquireFailures = conf.getInt(
|
|
|
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
|
|
|
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
|
|
|
- datanodeSocketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
|
|
- HdfsServerConstants.WRITE_TIMEOUT);
|
|
|
+ datanodeSocketWriteTimeout = conf.getInt(
|
|
|
+ DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
|
|
|
+ HdfsConstants.WRITE_TIMEOUT);
|
|
|
ioBufferSize = conf.getInt(
|
|
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
|
|
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
|
|
defaultChecksumOpt = getChecksumOptFromConf(conf);
|
|
|
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
|
|
|
- HdfsServerConstants.READ_TIMEOUT);
|
|
|
+ HdfsConstants.READ_TIMEOUT);
|
|
|
/** dfs.write.packet.size is an internal config variable */
|
|
|
writePacketSize = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
|
+ DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
|
+ DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
|
writeMaxPackets = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_KEY,
|
|
|
- HdfsClientConfigKeys.Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
|
|
-
|
|
|
+ Write.MAX_PACKETS_IN_FLIGHT_KEY,
|
|
|
+ Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
|
|
|
+
|
|
|
final boolean byteArrayManagerEnabled = conf.getBoolean(
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY,
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_DEFAULT);
|
|
|
+ Write.ByteArrayManager.ENABLED_KEY,
|
|
|
+ Write.ByteArrayManager.ENABLED_DEFAULT);
|
|
|
if (!byteArrayManagerEnabled) {
|
|
|
writeByteArrayManagerConf = null;
|
|
|
} else {
|
|
|
final int countThreshold = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
|
|
|
+ Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
|
|
|
+ Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
|
|
|
final int countLimit = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_KEY,
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
|
|
|
+ Write.ByteArrayManager.COUNT_LIMIT_KEY,
|
|
|
+ Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
|
|
|
final long countResetTimePeriodMs = conf.getLong(
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
|
|
|
- HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
|
|
+ Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
|
|
|
+ Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
|
|
|
writeByteArrayManagerConf = new ByteArrayManager.Conf(
|
|
|
- countThreshold, countLimit, countResetTimePeriodMs);
|
|
|
+ countThreshold, countLimit, countResetTimePeriodMs);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
|
|
DFS_BLOCK_SIZE_DEFAULT);
|
|
|
defaultReplication = (short) conf.getInt(
|
|
|
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);
|
|
|
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
|
|
|
excludedNodesCacheExpiry = conf.getLong(
|
|
|
- HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
|
|
|
- HdfsClientConfigKeys.Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
|
|
- prefetchSize = conf.getLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
|
|
|
+ Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY,
|
|
|
+ Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
|
|
+ prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
|
|
|
10 * defaultBlockSize);
|
|
|
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
|
|
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
|
|
numBlockWriteRetry = conf.getInt(
|
|
|
- HdfsClientConfigKeys.BlockWrite.RETRIES_KEY,
|
|
|
- HdfsClientConfigKeys.BlockWrite.RETRIES_DEFAULT);
|
|
|
+ BlockWrite.RETRIES_KEY,
|
|
|
+ BlockWrite.RETRIES_DEFAULT);
|
|
|
numBlockWriteLocateFollowingRetry = conf.getInt(
|
|
|
- HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
|
|
- HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
|
|
+ BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
|
|
+ BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
|
|
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
|
|
|
- HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
|
|
- HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
|
|
|
+ BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
|
|
+ BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
|
|
|
uMask = FsPermission.getUMask(conf);
|
|
|
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
|
|
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
|
@@ -192,30 +222,30 @@ public class DfsClientConf {
|
|
|
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
|
|
|
DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
|
|
|
slowIoWarningThresholdMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
|
|
-
|
|
|
+ DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
|
|
|
+ DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
|
|
+
|
|
|
shortCircuitConf = new ShortCircuitConf(conf);
|
|
|
|
|
|
hedgedReadThresholdMillis = conf.getLong(
|
|
|
- HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
|
|
|
- HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
|
|
+ HedgedRead.THRESHOLD_MILLIS_KEY,
|
|
|
+ HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
|
|
hedgedReadThreadpoolSize = conf.getInt(
|
|
|
- HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
|
|
|
- HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
|
|
|
+ HedgedRead.THREADPOOL_SIZE_KEY,
|
|
|
+ HedgedRead.THREADPOOL_SIZE_DEFAULT);
|
|
|
}
|
|
|
|
|
|
private DataChecksum.Type getChecksumType(Configuration conf) {
|
|
|
final String checksum = conf.get(
|
|
|
- DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
|
|
|
- DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
+ DFS_CHECKSUM_TYPE_KEY,
|
|
|
+ DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
try {
|
|
|
return DataChecksum.Type.valueOf(checksum);
|
|
|
} catch(IllegalArgumentException iae) {
|
|
|
- DFSClient.LOG.warn("Bad checksum type: " + checksum + ". Using default "
|
|
|
- + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
+ LOG.warn("Bad checksum type: {}. Using default {}", checksum,
|
|
|
+ DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
return DataChecksum.Type.valueOf(
|
|
|
- DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
+ DFS_CHECKSUM_TYPE_DEFAULT);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -465,8 +495,11 @@ public class DfsClientConf {
|
|
|
return shortCircuitConf;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Configuration for short-circuit reads.
|
|
|
+ */
|
|
|
public static class ShortCircuitConf {
|
|
|
- private static final Log LOG = LogFactory.getLog(ShortCircuitConf.class);
|
|
|
+ private static final Logger LOG = DfsClientConf.LOG;
|
|
|
|
|
|
private final int socketCacheCapacity;
|
|
|
private final long socketCacheExpiry;
|
|
@@ -480,9 +513,9 @@ public class DfsClientConf {
|
|
|
private final boolean shortCircuitLocalReads;
|
|
|
private final boolean domainSocketDataTraffic;
|
|
|
private final int shortCircuitStreamsCacheSize;
|
|
|
- private final long shortCircuitStreamsCacheExpiryMs;
|
|
|
+ private final long shortCircuitStreamsCacheExpiryMs;
|
|
|
private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
|
|
|
-
|
|
|
+
|
|
|
private final boolean shortCircuitMmapEnabled;
|
|
|
private final int shortCircuitMmapCacheSize;
|
|
|
private final long shortCircuitMmapCacheExpiryMs;
|
|
@@ -491,10 +524,6 @@ public class DfsClientConf {
|
|
|
|
|
|
private final long keyProviderCacheExpiryMs;
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
- public BlockReaderFactory.FailureInjector brfFailureInjector =
|
|
|
- new BlockReaderFactory.FailureInjector();
|
|
|
-
|
|
|
public ShortCircuitConf(Configuration conf) {
|
|
|
socketCacheCapacity = conf.getInt(
|
|
|
DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
|
|
@@ -504,66 +533,64 @@ public class DfsClientConf {
|
|
|
DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
|
|
|
|
|
|
useLegacyBlockReader = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
|
|
|
+ DFS_CLIENT_USE_LEGACY_BLOCKREADER,
|
|
|
+ DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
|
|
|
useLegacyBlockReaderLocal = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
|
|
+ DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
|
|
+ DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
|
|
shortCircuitLocalReads = conf.getBoolean(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.KEY,
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT);
|
|
|
+ Read.ShortCircuit.KEY,
|
|
|
+ Read.ShortCircuit.DEFAULT);
|
|
|
domainSocketDataTraffic = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
|
|
+ DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
+ DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
|
|
domainSocketPath = conf.getTrimmed(
|
|
|
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
- DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
|
|
|
-
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
|
|
|
- + " = " + useLegacyBlockReaderLocal);
|
|
|
- LOG.debug(HdfsClientConfigKeys.Read.ShortCircuit.KEY
|
|
|
- + " = " + shortCircuitLocalReads);
|
|
|
- LOG.debug(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
|
|
|
- + " = " + domainSocketDataTraffic);
|
|
|
- LOG.debug(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
|
|
|
- + " = " + domainSocketPath);
|
|
|
- }
|
|
|
+ DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
+ DFS_DOMAIN_SOCKET_PATH_DEFAULT);
|
|
|
+
|
|
|
+ LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
|
|
|
+ + " = {}", useLegacyBlockReaderLocal);
|
|
|
+ LOG.debug(Read.ShortCircuit.KEY
|
|
|
+ + " = {}", shortCircuitLocalReads);
|
|
|
+ LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
|
|
|
+ + " = {}", domainSocketDataTraffic);
|
|
|
+ LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY
|
|
|
+ + " = {}", domainSocketPath);
|
|
|
|
|
|
skipShortCircuitChecksums = conf.getBoolean(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
|
|
|
+ Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
|
|
+ Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT);
|
|
|
shortCircuitBufferSize = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_KEY,
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
|
|
|
+ Read.ShortCircuit.BUFFER_SIZE_KEY,
|
|
|
+ Read.ShortCircuit.BUFFER_SIZE_DEFAULT);
|
|
|
shortCircuitStreamsCacheSize = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
|
|
|
+ Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY,
|
|
|
+ Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT);
|
|
|
shortCircuitStreamsCacheExpiryMs = conf.getLong(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
|
|
|
+ Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
|
|
|
+ Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT);
|
|
|
shortCircuitMmapEnabled = conf.getBoolean(
|
|
|
- HdfsClientConfigKeys.Mmap.ENABLED_KEY,
|
|
|
- HdfsClientConfigKeys.Mmap.ENABLED_DEFAULT);
|
|
|
+ Mmap.ENABLED_KEY,
|
|
|
+ Mmap.ENABLED_DEFAULT);
|
|
|
shortCircuitMmapCacheSize = conf.getInt(
|
|
|
- HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY,
|
|
|
- HdfsClientConfigKeys.Mmap.CACHE_SIZE_DEFAULT);
|
|
|
+ Mmap.CACHE_SIZE_KEY,
|
|
|
+ Mmap.CACHE_SIZE_DEFAULT);
|
|
|
shortCircuitMmapCacheExpiryMs = conf.getLong(
|
|
|
- HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY,
|
|
|
- HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_DEFAULT);
|
|
|
+ Mmap.CACHE_TIMEOUT_MS_KEY,
|
|
|
+ Mmap.CACHE_TIMEOUT_MS_DEFAULT);
|
|
|
shortCircuitMmapCacheRetryTimeout = conf.getLong(
|
|
|
- HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_KEY,
|
|
|
- HdfsClientConfigKeys.Mmap.RETRY_TIMEOUT_MS_DEFAULT);
|
|
|
+ Mmap.RETRY_TIMEOUT_MS_KEY,
|
|
|
+ Mmap.RETRY_TIMEOUT_MS_DEFAULT);
|
|
|
shortCircuitCacheStaleThresholdMs = conf.getLong(
|
|
|
- HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
|
|
|
- HdfsClientConfigKeys.ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
|
|
|
+ ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY,
|
|
|
+ ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT);
|
|
|
shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
|
|
- DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
|
|
|
+ DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
|
|
|
+ DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
|
|
|
|
|
|
keyProviderCacheExpiryMs = conf.getLong(
|
|
|
- DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
|
|
|
- DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
|
|
|
+ DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
|
|
|
+ DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
|
|
|
}
|
|
|
|
|
|
/**
|