|
@@ -18,6 +18,9 @@
|
|
|
package org.apache.hadoop.dfs;
|
|
|
|
|
|
import org.apache.hadoop.io.*;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
+import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
@@ -28,6 +31,7 @@ import org.apache.commons.logging.*;
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/********************************************************
|
|
|
* DFSClient can connect to a Hadoop Filesystem and
|
|
@@ -94,6 +98,46 @@ class DFSClient implements FSConstants {
|
|
|
Runtime.getRuntime().addShutdownHook(clientFinalizer);
|
|
|
}
|
|
|
|
|
|
+ private static ClientProtocol createNamenode(
|
|
|
+ InetSocketAddress nameNodeAddr, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
|
|
|
+ 5, 200, TimeUnit.MILLISECONDS);
|
|
|
+ RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
|
+ 5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
|
|
+
|
|
|
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
|
|
|
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
+ exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
|
|
|
+ exceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
|
|
|
+
|
|
|
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
|
+ Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
|
|
|
+
|
|
|
+ methodNameToPolicyMap.put("open", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("setReplication", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("abandonBlock", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("abandonFileInProgress", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("reportBadBlocks", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("exists", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("isDir", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getListing", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getHints", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("renewLease", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getStats", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getBlockSize", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("complete", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("getEditLogSize", methodPolicy);
|
|
|
+ methodNameToPolicyMap.put("create", methodPolicy);
|
|
|
+
|
|
|
+ return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
|
|
|
+ RPC.getProxy(ClientProtocol.class,
|
|
|
+ ClientProtocol.versionID, nameNodeAddr, conf),
|
|
|
+ methodNameToPolicyMap);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Create a new DFSClient connected to the given namenode server.
|
|
@@ -101,8 +145,7 @@ class DFSClient implements FSConstants {
|
|
|
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
|
|
|
throws IOException {
|
|
|
this.conf = conf;
|
|
|
- this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
|
|
|
- ClientProtocol.versionID, nameNodeAddr, conf);
|
|
|
+ this.namenode = createNamenode(nameNodeAddr, conf);
|
|
|
String taskId = conf.get("mapred.task.id");
|
|
|
if (taskId != null) {
|
|
|
this.clientName = "DFSClient_" + taskId;
|
|
@@ -160,19 +203,12 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
|
|
|
public long getBlockSize(UTF8 f) throws IOException {
|
|
|
- int retries = 4;
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- return namenode.getBlockSize(f.toString());
|
|
|
- } catch (IOException ie) {
|
|
|
- if (--retries == 0) {
|
|
|
- LOG.warn("Problem getting block size: " +
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
- throw ie;
|
|
|
- }
|
|
|
- LOG.debug("Problem getting block size: " +
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
- }
|
|
|
+ try {
|
|
|
+ return namenode.getBlockSize(f.toString());
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.warn("Problem getting block size: " +
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
+ throw ie;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1133,31 +1169,8 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
|
|
|
private LocatedBlock locateNewBlock() throws IOException {
|
|
|
- int retries = 3;
|
|
|
- while (true) {
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- return namenode.create(src.toString(), clientName.toString(),
|
|
|
- overwrite, replication, blockSize);
|
|
|
- } catch (RemoteException e) {
|
|
|
- if (--retries == 0 ||
|
|
|
- !AlreadyBeingCreatedException.class.getName().
|
|
|
- equals(e.getClassName())) {
|
|
|
- throw e;
|
|
|
- } else {
|
|
|
- // because failed tasks take upto LEASE_PERIOD to
|
|
|
- // release their pendingCreates files, if the file
|
|
|
- // we want to create is already being created,
|
|
|
- // wait and try again.
|
|
|
- LOG.info(StringUtils.stringifyException(e));
|
|
|
- try {
|
|
|
- Thread.sleep(LEASE_SOFTLIMIT_PERIOD);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ return namenode.create(src.toString(), clientName.toString(),
|
|
|
+ overwrite, replication, blockSize);
|
|
|
}
|
|
|
|
|
|
private LocatedBlock locateFollowingBlock(long start
|