|
@@ -42,13 +42,17 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
|
|
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
|
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
@@ -57,6 +61,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
@@ -807,23 +812,16 @@ public class DFSUtil {
|
|
|
/** Create a {@link NameNode} proxy */
|
|
|
public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
|
|
|
Configuration conf) throws IOException {
|
|
|
- return createNamenode(nameNodeAddr, conf,
|
|
|
+ return createNamenode(nameNodeAddr, conf,
|
|
|
UserGroupInformation.getCurrentUser());
|
|
|
}
|
|
|
|
|
|
/** Create a {@link NameNode} proxy */
|
|
|
- public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
|
|
|
+ public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
|
|
|
Configuration conf, UserGroupInformation ugi) throws IOException {
|
|
|
- /**
|
|
|
- * Currently we have simply burnt-in support for a SINGLE
|
|
|
- * protocol - protocolPB. This will be replaced
|
|
|
- * by a way to pick the right protocol based on the
|
|
|
- * version of the target server.
|
|
|
- */
|
|
|
- return new org.apache.hadoop.hdfs.protocolPB.
|
|
|
- ClientNamenodeProtocolTranslatorPB(nameNodeAddr, conf, ugi);
|
|
|
+ return createNNProxyWithClientProtocol(nameNodeAddr, conf, ugi, true);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/** Create a {@link ClientDatanodeProtocol} proxy */
|
|
|
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|
|
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
|
@@ -846,31 +844,115 @@ public class DFSUtil {
|
|
|
SocketFactory factory) throws IOException {
|
|
|
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Build a NamenodeProtocol connection to the namenode and set up the retry
|
|
|
- * policy
|
|
|
+ * Build a proxy connection to the namenode with NamenodeProtocol and set up
|
|
|
+ * the proxy with retry policy.
|
|
|
+ * @param address - namenode address
|
|
|
+ * @param conf - configuration
|
|
|
+ * @param ugi - User group information
|
|
|
+ * @return a proxy connection with NamenodeProtocol
|
|
|
+ * @throws - IOException
|
|
|
*/
|
|
|
public static NamenodeProtocolTranslatorPB createNNProxyWithNamenodeProtocol(
|
|
|
InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
|
|
|
throws IOException {
|
|
|
- RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
|
|
|
- TimeUnit.MILLISECONDS);
|
|
|
- Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
|
|
|
- = new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
- RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
|
|
|
- exceptionToPolicyMap);
|
|
|
- Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
|
|
|
- methodNameToPolicyMap.put("getBlocks", methodPolicy);
|
|
|
- methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
|
|
|
- RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
|
|
|
- ProtobufRpcEngine.class);
|
|
|
- NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class, RPC
|
|
|
- .getProtocolVersion(NamenodeProtocolPB.class), address, ugi, conf,
|
|
|
- NetUtils.getDefaultSocketFactory(conf));
|
|
|
- NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
|
|
|
- NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
|
|
|
- return new NamenodeProtocolTranslatorPB(retryProxy);
|
|
|
+ return createNNProxyWithNamenodeProtocol(address, conf, ugi, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Build a proxy connection to the namenode with NamenodeProtocol.
|
|
|
+ * @param address - namenode address
|
|
|
+ * @param conf - configuration
|
|
|
+ * @param ugi - User group information
|
|
|
+ * @param withRetries - indicates whether to create retry proxy or not
|
|
|
+ * @return a proxy connection with NamenodeProtocol
|
|
|
+ * @throws - IOException
|
|
|
+ */
|
|
|
+ public static NamenodeProtocolTranslatorPB createNNProxyWithNamenodeProtocol(
|
|
|
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
+ boolean withRetries) throws IOException {
|
|
|
+ NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
|
|
|
+ address, conf, ugi, NamenodeProtocolPB.class);
|
|
|
+ if (withRetries) { // create the proxy with retries
|
|
|
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
|
|
|
+ TimeUnit.MILLISECONDS);
|
|
|
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
|
|
|
+ = new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
|
|
|
+ exceptionToPolicyMap);
|
|
|
+ Map<String, RetryPolicy> methodNameToPolicyMap
|
|
|
+ = new HashMap<String, RetryPolicy>();
|
|
|
+ methodNameToPolicyMap.put("getBlocks", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
|
|
|
+ proxy = (NamenodeProtocolPB) RetryProxy.create(NamenodeProtocolPB.class,
|
|
|
+ proxy, methodNameToPolicyMap);
|
|
|
+ }
|
|
|
+ return new NamenodeProtocolTranslatorPB(proxy);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Build a proxy connection to the namenode with ClientProtocol.
|
|
|
+ * @param address - namenode address
|
|
|
+ * @param conf - configuration
|
|
|
+ * @param ugi - User group information
|
|
|
+ * @param withRetries - indicates whether to create retry proxy or not
|
|
|
+ * @return a proxy connection with ClientProtocol
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public static ClientNamenodeProtocolTranslatorPB createNNProxyWithClientProtocol(
|
|
|
+ InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
|
|
|
+ boolean withRetries) throws IOException {
|
|
|
+ ClientNamenodeProtocolPB proxy
|
|
|
+ = (ClientNamenodeProtocolPB) createNameNodeProxy(address, conf, ugi,
|
|
|
+ ClientNamenodeProtocolPB.class);
|
|
|
+ if (withRetries) { // create the proxy with retries
|
|
|
+ proxy = createNameNodeProxyWithRetries(proxy);
|
|
|
+ }
|
|
|
+ return new ClientNamenodeProtocolTranslatorPB(proxy);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates the retry proxy by setting up the retry policy.
|
|
|
+ * @param proxy - non retry proxy connection
|
|
|
+ * @return a retry proxy connection
|
|
|
+ */
|
|
|
+ public static ClientNamenodeProtocolPB createNameNodeProxyWithRetries(
|
|
|
+ ClientNamenodeProtocolPB proxy) {
|
|
|
+ RetryPolicy createPolicy = RetryPolicies
|
|
|
+ .retryUpToMaximumCountWithFixedSleep(5,
|
|
|
+ HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
|
|
+
|
|
|
+ Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap
|
|
|
+ = new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
+ remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
|
|
|
+ createPolicy);
|
|
|
+
|
|
|
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
|
|
|
+ = new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
+ exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
|
|
|
+ .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
|
|
+ remoteExceptionToPolicyMap));
|
|
|
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
|
+ Map<String, RetryPolicy> methodNameToPolicyMap
|
|
|
+ = new HashMap<String, RetryPolicy>();
|
|
|
+
|
|
|
+ methodNameToPolicyMap.put("create", methodPolicy);
|
|
|
+
|
|
|
+ ClientNamenodeProtocolPB retryProxy = (ClientNamenodeProtocolPB) RetryProxy
|
|
|
+ .create(ClientNamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
|
|
|
+ return retryProxy;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private static Object createNameNodeProxy(InetSocketAddress address,
|
|
|
+ Configuration conf, UserGroupInformation ugi, Class xface)
|
|
|
+ throws IOException {
|
|
|
+ RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
|
|
|
+ Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
|
|
|
+ ugi, conf, NetUtils.getDefaultSocketFactory(conf));
|
|
|
+ return proxy;
|
|
|
}
|
|
|
|
|
|
/**
|