|
@@ -17,9 +17,6 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
|
|
-import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
|
|
|
- .EncryptedKeyVersion;
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
@@ -59,6 +56,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_MAX_PACKETS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
|
@@ -72,7 +71,6 @@ import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
|
-import java.lang.reflect.Proxy;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
@@ -89,10 +87,10 @@ import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.SynchronousQueue;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
@@ -107,7 +105,9 @@ import org.apache.hadoop.crypto.CryptoInputStream;
|
|
|
import org.apache.hadoop.crypto.CryptoOutputStream;
|
|
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
|
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
|
|
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
|
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
|
|
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.BlockStorageLocation;
|
|
|
import org.apache.hadoop.fs.CacheFlag;
|
|
@@ -135,8 +135,8 @@ import org.apache.hadoop.fs.XAttr;
|
|
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
|
|
import org.apache.hadoop.fs.permission.AclEntry;
|
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
|
import org.apache.hadoop.hdfs.net.Peer;
|
|
@@ -202,7 +202,6 @@ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
|
|
|
import org.apache.hadoop.ipc.Client;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
-import org.apache.hadoop.ipc.RpcInvocationHandler;
|
|
|
import org.apache.hadoop.net.DNS;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
@@ -217,17 +216,16 @@ import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.DataChecksum.Type;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
+import org.htrace.Sampler;
|
|
|
+import org.htrace.Span;
|
|
|
+import org.htrace.Trace;
|
|
|
+import org.htrace.TraceScope;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.google.common.net.InetAddresses;
|
|
|
-import org.htrace.Sampler;
|
|
|
-import org.htrace.Span;
|
|
|
-import org.htrace.Trace;
|
|
|
-import org.htrace.TraceScope;
|
|
|
-import org.htrace.impl.ProbabilitySampler;
|
|
|
|
|
|
/********************************************************
|
|
|
* DFSClient can connect to a Hadoop Filesystem and
|
|
@@ -294,6 +292,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
final int ioBufferSize;
|
|
|
final ChecksumOpt defaultChecksumOpt;
|
|
|
final int writePacketSize;
|
|
|
+ final int writeMaxPackets;
|
|
|
final int socketTimeout;
|
|
|
final int socketCacheCapacity;
|
|
|
final long socketCacheExpiry;
|
|
@@ -364,6 +363,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
/** dfs.write.packet.size is an internal config variable */
|
|
|
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
|
|
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
|
|
|
+ writeMaxPackets = conf.getInt(DFS_CLIENT_WRITE_MAX_PACKETS_KEY,
|
|
|
+ DFS_CLIENT_WRITE_MAX_PACKETS_DEFAULT);
|
|
|
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
|
|
|
DFS_BLOCK_SIZE_DEFAULT);
|
|
|
defaultReplication = (short) conf.getInt(
|