|
@@ -176,6 +176,9 @@ public class DFSClient implements java.io.Closeable {
|
|
|
public static final Log LOG = LogFactory.getLog(DFSClient.class);
|
|
|
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
|
|
|
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
|
|
|
+
|
|
|
+ private final Configuration conf;
|
|
|
+ private final Conf dfsClientConf;
|
|
|
final ClientProtocol namenode;
|
|
|
/* The service used for delegation tokens */
|
|
|
private Text dtService;
|
|
@@ -186,14 +189,11 @@ public class DFSClient implements java.io.Closeable {
|
|
|
private volatile FsServerDefaults serverDefaults;
|
|
|
private volatile long serverDefaultsLastUpdate;
|
|
|
final String clientName;
|
|
|
- Configuration conf;
|
|
|
SocketFactory socketFactory;
|
|
|
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
|
|
|
final FileSystem.Statistics stats;
|
|
|
- final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
private final String authority;
|
|
|
final PeerCache peerCache;
|
|
|
- final Conf dfsClientConf;
|
|
|
private Random r = new Random();
|
|
|
private SocketAddress[] localInterfaceAddrs;
|
|
|
private DataEncryptionKey encryptionKey;
|
|
@@ -202,7 +202,8 @@ public class DFSClient implements java.io.Closeable {
|
|
|
/**
|
|
|
* DFSClient configuration
|
|
|
*/
|
|
|
- static class Conf {
|
|
|
+ public static class Conf {
|
|
|
+ final int hdfsTimeout; // timeout value for a DFS operation.
|
|
|
final int maxFailoverAttempts;
|
|
|
final int failoverSleepBaseMillis;
|
|
|
final int failoverSleepMaxMillis;
|
|
@@ -225,18 +226,25 @@ public class DFSClient implements java.io.Closeable {
|
|
|
final short defaultReplication;
|
|
|
final String taskId;
|
|
|
final FsPermission uMask;
|
|
|
- final boolean useLegacyBlockReaderLocal;
|
|
|
final boolean connectToDnViaHostname;
|
|
|
final boolean getHdfsBlocksMetadataEnabled;
|
|
|
final int getFileBlockStorageLocationsNumThreads;
|
|
|
final int getFileBlockStorageLocationsTimeout;
|
|
|
+
|
|
|
+ final boolean useLegacyBlockReader;
|
|
|
+ final boolean useLegacyBlockReaderLocal;
|
|
|
final String domainSocketPath;
|
|
|
final boolean skipShortCircuitChecksums;
|
|
|
final int shortCircuitBufferSize;
|
|
|
final boolean shortCircuitLocalReads;
|
|
|
final boolean domainSocketDataTraffic;
|
|
|
+ final int shortCircuitStreamsCacheSize;
|
|
|
+ final long shortCircuitStreamsCacheExpiryMs;
|
|
|
+
|
|
|
+ public Conf(Configuration conf) {
|
|
|
+ // The hdfsTimeout is currently the same as the ipc timeout
|
|
|
+ hdfsTimeout = Client.getTimeout(conf);
|
|
|
|
|
|
- Conf(Configuration conf) {
|
|
|
maxFailoverAttempts = conf.getInt(
|
|
|
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
|
|
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
|
@@ -275,19 +283,15 @@ public class DFSClient implements java.io.Closeable {
|
|
|
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
|
|
|
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
|
|
|
10 * defaultBlockSize);
|
|
|
- timeWindow = conf
|
|
|
- .getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
|
|
|
+ timeWindow = conf.getInt(DFS_CLIENT_RETRY_WINDOW_BASE, 3000);
|
|
|
nCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
|
|
|
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
|
|
|
nBlockWriteRetry = conf.getInt(DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY,
|
|
|
DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT);
|
|
|
- nBlockWriteLocateFollowingRetry = conf
|
|
|
- .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
|
|
- DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
|
|
+ nBlockWriteLocateFollowingRetry = conf.getInt(
|
|
|
+ DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
|
|
+ DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
|
|
|
uMask = FsPermission.getUMask(conf);
|
|
|
- useLegacyBlockReaderLocal = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
|
|
- DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
|
|
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
|
|
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
|
|
getHdfsBlocksMetadataEnabled = conf.getBoolean(
|
|
@@ -299,20 +303,50 @@ public class DFSClient implements java.io.Closeable {
|
|
|
getFileBlockStorageLocationsTimeout = conf.getInt(
|
|
|
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
|
|
|
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
|
|
|
- domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
+
|
|
|
+ useLegacyBlockReader = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
|
|
|
+ useLegacyBlockReaderLocal = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
|
|
|
+ shortCircuitLocalReads = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
|
|
|
+ domainSocketDataTraffic = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
|
|
+ domainSocketPath = conf.getTrimmed(
|
|
|
+ DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
|
|
|
+
|
|
|
+ if (BlockReaderLocal.LOG.isDebugEnabled()) {
|
|
|
+ BlockReaderLocal.LOG.debug(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL
|
|
|
+ + " = " + useLegacyBlockReaderLocal);
|
|
|
+ BlockReaderLocal.LOG.debug(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY
|
|
|
+ + " = " + shortCircuitLocalReads);
|
|
|
+ BlockReaderLocal.LOG.debug(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC
|
|
|
+ + " = " + domainSocketDataTraffic);
|
|
|
+ BlockReaderLocal.LOG.debug(
|
|
|
+ DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY
|
|
|
+ + " = " + domainSocketPath);
|
|
|
+ }
|
|
|
+
|
|
|
skipShortCircuitChecksums = conf.getBoolean(
|
|
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
|
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
|
|
|
shortCircuitBufferSize = conf.getInt(
|
|
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
|
|
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
|
|
|
- shortCircuitLocalReads = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
|
|
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
|
|
|
- domainSocketDataTraffic = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
|
|
|
- DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
|
|
|
+ shortCircuitStreamsCacheSize = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT);
|
|
|
+ shortCircuitStreamsCacheExpiryMs = conf.getLong(
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
|
|
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT);
|
|
|
}
|
|
|
|
|
|
private DataChecksum.Type getChecksumType(Configuration conf) {
|
|
@@ -358,10 +392,14 @@ public class DFSClient implements java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Conf getConf() {
|
|
|
+ public Conf getConf() {
|
|
|
return dfsClientConf;
|
|
|
}
|
|
|
|
|
|
+ Configuration getConfiguration() {
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* A map from file names to {@link DFSOutputStream} objects
|
|
|
* that are currently being written by this client.
|
|
@@ -424,8 +462,6 @@ public class DFSClient implements java.io.Closeable {
|
|
|
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
|
|
|
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
|
|
|
|
|
|
- // The hdfsTimeout is currently the same as the ipc timeout
|
|
|
- this.hdfsTimeout = Client.getTimeout(conf);
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
|
|
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
|
|
@@ -540,21 +576,13 @@ public class DFSClient implements java.io.Closeable {
|
|
|
}
|
|
|
|
|
|
int getHdfsTimeout() {
|
|
|
- return hdfsTimeout;
|
|
|
+ return dfsClientConf.hdfsTimeout;
|
|
|
}
|
|
|
|
|
|
String getClientName() {
|
|
|
return clientName;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @return whether the client should use hostnames instead of IPs
|
|
|
- * when connecting to DataNodes
|
|
|
- */
|
|
|
- boolean connectToDnViaHostname() {
|
|
|
- return dfsClientConf.connectToDnViaHostname;
|
|
|
- }
|
|
|
-
|
|
|
void checkOpen() throws IOException {
|
|
|
if (!clientRunning) {
|
|
|
IOException result = new IOException("Filesystem closed");
|
|
@@ -802,6 +830,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
* @throws IOException
|
|
|
* @deprecated Use Token.renew instead.
|
|
|
*/
|
|
|
+ @Deprecated
|
|
|
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
|
|
throws InvalidToken, IOException {
|
|
|
LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
|
|
@@ -846,6 +875,7 @@ public class DFSClient implements java.io.Closeable {
|
|
|
* @throws IOException
|
|
|
* @deprecated Use Token.cancel instead.
|
|
|
*/
|
|
|
+ @Deprecated
|
|
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
|
|
throws InvalidToken, IOException {
|
|
|
LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
|
|
@@ -947,6 +977,11 @@ public class DFSClient implements java.io.Closeable {
|
|
|
return dfsClientConf.defaultReplication;
|
|
|
}
|
|
|
|
|
|
+ public LocatedBlocks getLocatedBlocks(String src, long start)
|
|
|
+ throws IOException {
|
|
|
+ return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* This is just a wrapper around callGetBlockLocations, but non-static so that
|
|
|
* we can stub it out for tests.
|
|
@@ -1675,10 +1710,10 @@ public class DFSClient implements java.io.Closeable {
|
|
|
* @param socketFactory to create sockets to connect to DNs
|
|
|
* @param socketTimeout timeout to use when connecting and waiting for a response
|
|
|
* @param encryptionKey the key needed to communicate with DNs in this cluster
|
|
|
- * @param connectToDnViaHostname {@link #connectToDnViaHostname()}
|
|
|
+ * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
|
|
|
* @return The checksum
|
|
|
*/
|
|
|
- static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
|
|
+ private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
|
|
String clientName,
|
|
|
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
|
|
|
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
|