|
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
+import org.apache.hadoop.hdfs.HAUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
|
@@ -86,6 +87,7 @@ import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryUtils;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
@@ -121,7 +123,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
/** SPNEGO authenticator */
|
|
|
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
|
|
|
- /** Default connection factory may be overriden in tests to use smaller timeout values */
|
|
|
+ /** Default connection factory may be overridden in tests to use smaller timeout values */
|
|
|
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
|
|
|
/** Configures connections for AuthenticatedURL */
|
|
|
private final ConnectionConfigurator CONN_CONFIGURATOR =
|
|
@@ -161,12 +163,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
private UserGroupInformation ugi;
|
|
|
- private InetSocketAddress nnAddr;
|
|
|
private URI uri;
|
|
|
private boolean hasInitedToken;
|
|
|
private Token<?> delegationToken;
|
|
|
private RetryPolicy retryPolicy = null;
|
|
|
private Path workingDir;
|
|
|
+ private InetSocketAddress nnAddrs[];
|
|
|
+ private int currentNNAddrIndex;
|
|
|
|
|
|
/**
|
|
|
* Return the protocol scheme for the FileSystem.
|
|
@@ -176,7 +179,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
*/
|
|
|
@Override
|
|
|
public String getScheme() {
|
|
|
- return "webhdfs";
|
|
|
+ return SCHEME;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -185,20 +188,42 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
super.initialize(uri, conf);
|
|
|
setConf(conf);
|
|
|
ugi = UserGroupInformation.getCurrentUser();
|
|
|
+
|
|
|
try {
|
|
|
- this.uri = new URI(uri.getScheme(), uri.getAuthority(), null, null, null);
|
|
|
+ this.uri = new URI(uri.getScheme(), uri.getAuthority(), null,
|
|
|
+ null, null);
|
|
|
+ this.nnAddrs = DFSUtil.resolve(this.uri, getDefaultPort(), conf);
|
|
|
} catch (URISyntaxException e) {
|
|
|
throw new IllegalArgumentException(e);
|
|
|
}
|
|
|
- this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
|
|
- this.retryPolicy =
|
|
|
- RetryUtils.getDefaultRetryPolicy(
|
|
|
- conf,
|
|
|
- DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
- DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
- DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
- DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
|
|
|
- SafeModeException.class);
|
|
|
+
|
|
|
+ if (!HAUtil.isLogicalUri(conf, this.uri)) {
|
|
|
+ this.retryPolicy =
|
|
|
+ RetryUtils.getDefaultRetryPolicy(
|
|
|
+ conf,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
|
|
|
+ SafeModeException.class);
|
|
|
+ } else {
|
|
|
+
|
|
|
+ int maxFailoverAttempts = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
|
|
|
+ int failoverSleepBaseMillis = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
|
|
|
+ int failoverSleepMaxMillis = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
|
|
|
+ DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
|
|
|
+
|
|
|
+ this.retryPolicy = RetryPolicies
|
|
|
+ .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
|
|
+ maxFailoverAttempts, failoverSleepBaseMillis,
|
|
|
+ failoverSleepMaxMillis);
|
|
|
+ }
|
|
|
+
|
|
|
this.workingDir = getHomeDirectory();
|
|
|
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
@@ -350,6 +375,19 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
return ((RemoteException)ioe).unwrapRemoteException();
|
|
|
}
|
|
|
|
|
|
+ private synchronized InetSocketAddress getCurrentNNAddr() {
|
|
|
+ return nnAddrs[currentNNAddrIndex];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reset the appropriate state to gracefully fail over to another name node
|
|
|
+ */
|
|
|
+ private synchronized void resetStateToFailOver() {
|
|
|
+ currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
|
|
|
+ delegationToken = null;
|
|
|
+ hasInitedToken = false;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Return a URL pointing to given path on the namenode.
|
|
|
*
|
|
@@ -359,6 +397,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
* @throws IOException on error constructing the URL
|
|
|
*/
|
|
|
private URL getNamenodeURL(String path, String query) throws IOException {
|
|
|
+ InetSocketAddress nnAddr = getCurrentNNAddr();
|
|
|
final URL url = new URL("http", nnAddr.getHostName(),
|
|
|
nnAddr.getPort(), path + '?' + query);
|
|
|
if (LOG.isTraceEnabled()) {
|
|
@@ -416,38 +455,28 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
*/
|
|
|
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
|
|
|
final Param<?,?>... parameters) throws IOException {
|
|
|
- return new Runner(op, fspath, parameters).run().json;
|
|
|
+ return new FsPathRunner(op, fspath, parameters).run().json;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* This class is for initialing a HTTP connection, connecting to server,
|
|
|
* obtaining a response, and also handling retry on failures.
|
|
|
*/
|
|
|
- class Runner {
|
|
|
- private final HttpOpParam.Op op;
|
|
|
- private final URL url;
|
|
|
+ abstract class AbstractRunner {
|
|
|
+ abstract protected URL getUrl() throws IOException;
|
|
|
+
|
|
|
+ protected final HttpOpParam.Op op;
|
|
|
private final boolean redirected;
|
|
|
|
|
|
private boolean checkRetry;
|
|
|
- private HttpURLConnection conn = null;
|
|
|
+ protected HttpURLConnection conn = null;
|
|
|
private Map<?, ?> json = null;
|
|
|
|
|
|
- Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
|
|
|
+ protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
|
|
|
this.op = op;
|
|
|
- this.url = url;
|
|
|
this.redirected = redirected;
|
|
|
}
|
|
|
|
|
|
- Runner(final HttpOpParam.Op op, final Path fspath,
|
|
|
- final Param<?,?>... parameters) throws IOException {
|
|
|
- this(op, toUrl(op, fspath, parameters), false);
|
|
|
- }
|
|
|
-
|
|
|
- Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
|
|
|
- this(op, null, false);
|
|
|
- this.conn = conn;
|
|
|
- }
|
|
|
-
|
|
|
private HttpURLConnection getHttpUrlConnection(final URL url)
|
|
|
throws IOException, AuthenticationException {
|
|
|
UserGroupInformation connectUgi = ugi.getRealUser();
|
|
@@ -495,6 +524,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
private void init() throws IOException {
|
|
|
checkRetry = !redirected;
|
|
|
+ URL url = getUrl();
|
|
|
try {
|
|
|
conn = getHttpUrlConnection(url);
|
|
|
} catch(AuthenticationException ae) {
|
|
@@ -521,7 +551,23 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Runner run() throws IOException {
|
|
|
+ AbstractRunner run() throws IOException {
|
|
|
+ /**
|
|
|
+ * Do the real work.
|
|
|
+ *
|
|
|
+ * There are three cases that the code inside the loop can throw an
|
|
|
+ * IOException:
|
|
|
+ *
|
|
|
+ * <ul>
|
|
|
+ * <li>The connection has failed (e.g., ConnectException,
|
|
|
+ * @see FailoverOnNetworkExceptionRetry for more details)</li>
|
|
|
+ * <li>The namenode enters the standby state (i.e., StandbyException).</li>
|
|
|
+ * <li>The server returns errors for the command (i.e., RemoteException)</li>
|
|
|
+ * </ul>
|
|
|
+ *
|
|
|
+ * The call to shouldRetry() will conduct the retry policy. The policy
|
|
|
+ * examines the exception and swallows it if it decides to rerun the work.
|
|
|
+ */
|
|
|
for(int retry = 0; ; retry++) {
|
|
|
try {
|
|
|
init();
|
|
@@ -539,14 +585,25 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
|
|
|
private void shouldRetry(final IOException ioe, final int retry
|
|
|
) throws IOException {
|
|
|
+ InetSocketAddress nnAddr = getCurrentNNAddr();
|
|
|
if (checkRetry) {
|
|
|
try {
|
|
|
final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
|
|
|
ioe, retry, 0, true);
|
|
|
- if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
|
|
|
+
|
|
|
+ boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
|
|
|
+ boolean isFailoverAndRetry =
|
|
|
+ a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
|
|
|
+
|
|
|
+ if (isRetry || isFailoverAndRetry) {
|
|
|
LOG.info("Retrying connect to namenode: " + nnAddr
|
|
|
+ ". Already tried " + retry + " time(s); retry policy is "
|
|
|
- + retryPolicy + ", delay " + a.delayMillis + "ms.");
|
|
|
+ + retryPolicy + ", delay " + a.delayMillis + "ms.");
|
|
|
+
|
|
|
+ if (isFailoverAndRetry) {
|
|
|
+ resetStateToFailOver();
|
|
|
+ }
|
|
|
+
|
|
|
Thread.sleep(a.delayMillis);
|
|
|
return;
|
|
|
}
|
|
@@ -619,6 +676,48 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ final class FsPathRunner extends AbstractRunner {
|
|
|
+ private final Path fspath;
|
|
|
+ private final Param<?, ?>[] parameters;
|
|
|
+
|
|
|
+ FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
|
|
|
+ super(op, false);
|
|
|
+ this.fspath = fspath;
|
|
|
+ this.parameters = parameters;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected URL getUrl() throws IOException {
|
|
|
+ return toUrl(op, fspath, parameters);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ final class URLRunner extends AbstractRunner {
|
|
|
+ private final URL url;
|
|
|
+ @Override
|
|
|
+ protected URL getUrl() {
|
|
|
+ return url;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected URLRunner(final HttpOpParam.Op op, final URL url, boolean redirected) {
|
|
|
+ super(op, redirected);
|
|
|
+ this.url = url;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ final class ConnRunner extends AbstractRunner {
|
|
|
+ protected ConnRunner(final HttpOpParam.Op op, HttpURLConnection conn) {
|
|
|
+ super(op, false);
|
|
|
+ this.conn = conn;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected URL getUrl() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private FsPermission applyUMask(FsPermission permission) {
|
|
|
if (permission == null) {
|
|
|
permission = FsPermission.getDefault();
|
|
@@ -774,7 +873,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
statistics.incrementWriteOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
|
|
|
- return new Runner(op, f,
|
|
|
+ return new FsPathRunner(op, f,
|
|
|
new PermissionParam(applyUMask(permission)),
|
|
|
new OverwriteParam(overwrite),
|
|
|
new BufferSizeParam(bufferSize),
|
|
@@ -790,7 +889,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
statistics.incrementWriteOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
|
|
|
- return new Runner(op, f, new BufferSizeParam(bufferSize))
|
|
|
+ return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
|
|
|
.run()
|
|
|
.write(bufferSize);
|
|
|
}
|
|
@@ -837,7 +936,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final boolean resolved) throws IOException {
|
|
|
final URL offsetUrl = offset == 0L? url
|
|
|
: new URL(url + "&" + new OffsetParam(offset));
|
|
|
- return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
|
|
|
+ return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -911,7 +1010,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
|
|
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
|
|
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
|
|
- SecurityUtil.setTokenService(token, nnAddr);
|
|
|
+ SecurityUtil.setTokenService(token, getCurrentNNAddr());
|
|
|
return token;
|
|
|
}
|
|
|
|