|
@@ -55,7 +55,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
-import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
|
@@ -89,7 +88,6 @@ import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
|
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
-import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
@@ -149,7 +147,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
private URI uri;
|
|
|
private Token<?> delegationToken;
|
|
|
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
|
|
- private RetryPolicy retryPolicy = null;
|
|
|
private Path workingDir;
|
|
|
|
|
|
{
|
|
@@ -182,7 +179,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
throw new IllegalArgumentException(e);
|
|
|
}
|
|
|
this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
|
|
- this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
|
|
|
this.workingDir = getHomeDirectory();
|
|
|
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
@@ -280,13 +276,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
|
|
|
- final HttpURLConnection conn, boolean unwrapException) throws IOException {
|
|
|
+ final HttpURLConnection conn) throws IOException {
|
|
|
final int code = conn.getResponseCode();
|
|
|
if (code != op.getExpectedHttpResponseCode()) {
|
|
|
final Map<?, ?> m;
|
|
|
try {
|
|
|
m = jsonParse(conn, true);
|
|
|
- } catch(Exception e) {
|
|
|
+ } catch(IOException e) {
|
|
|
throw new IOException("Unexpected HTTP response: code=" + code + " != "
|
|
|
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
|
|
|
+ ", message=" + conn.getResponseMessage(), e);
|
|
@@ -297,42 +293,21 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
final RemoteException re = JsonUtil.toRemoteException(m);
|
|
|
- throw unwrapException? toIOException(re): re;
|
|
|
+ throw re.unwrapRemoteException(AccessControlException.class,
|
|
|
+ InvalidToken.class,
|
|
|
+ AuthenticationException.class,
|
|
|
+ AuthorizationException.class,
|
|
|
+ FileAlreadyExistsException.class,
|
|
|
+ FileNotFoundException.class,
|
|
|
+ ParentNotDirectoryException.class,
|
|
|
+ UnresolvedPathException.class,
|
|
|
+ SafeModeException.class,
|
|
|
+ DSQuotaExceededException.class,
|
|
|
+ NSQuotaExceededException.class);
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Covert an exception to an IOException.
|
|
|
- *
|
|
|
- * For a non-IOException, wrap it with IOException.
|
|
|
- * For a RemoteException, unwrap it.
|
|
|
- * For an IOException which is not a RemoteException, return it.
|
|
|
- */
|
|
|
- private static IOException toIOException(Exception e) {
|
|
|
- if (!(e instanceof IOException)) {
|
|
|
- return new IOException(e);
|
|
|
- }
|
|
|
-
|
|
|
- final IOException ioe = (IOException)e;
|
|
|
- if (!(ioe instanceof RemoteException)) {
|
|
|
- return ioe;
|
|
|
- }
|
|
|
-
|
|
|
- final RemoteException re = (RemoteException)ioe;
|
|
|
- return re.unwrapRemoteException(AccessControlException.class,
|
|
|
- InvalidToken.class,
|
|
|
- AuthenticationException.class,
|
|
|
- AuthorizationException.class,
|
|
|
- FileAlreadyExistsException.class,
|
|
|
- FileNotFoundException.class,
|
|
|
- ParentNotDirectoryException.class,
|
|
|
- UnresolvedPathException.class,
|
|
|
- SafeModeException.class,
|
|
|
- DSQuotaExceededException.class,
|
|
|
- NSQuotaExceededException.class);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Return a URL pointing to given path on the namenode.
|
|
|
*
|
|
@@ -387,15 +362,70 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
private HttpURLConnection getHttpUrlConnection(URL url)
|
|
|
- throws IOException, AuthenticationException {
|
|
|
+ throws IOException {
|
|
|
final HttpURLConnection conn;
|
|
|
- if (ugi.hasKerberosCredentials()) {
|
|
|
- conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
|
|
- } else {
|
|
|
- conn = (HttpURLConnection)url.openConnection();
|
|
|
+ try {
|
|
|
+ if (ugi.hasKerberosCredentials()) {
|
|
|
+ conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
|
|
+ } else {
|
|
|
+ conn = (HttpURLConnection)url.openConnection();
|
|
|
+ }
|
|
|
+ } catch (AuthenticationException e) {
|
|
|
+ throw new IOException("Authentication failed, url=" + url, e);
|
|
|
}
|
|
|
return conn;
|
|
|
}
|
|
|
+
|
|
|
+ private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
|
|
|
+ final Param<?,?>... parameters) throws IOException {
|
|
|
+ final URL url = toUrl(op, fspath, parameters);
|
|
|
+
|
|
|
+ //connect and get response
|
|
|
+ HttpURLConnection conn = getHttpUrlConnection(url);
|
|
|
+ try {
|
|
|
+ conn.setRequestMethod(op.getType().toString());
|
|
|
+ if (op.getDoOutput()) {
|
|
|
+ conn = twoStepWrite(conn, op);
|
|
|
+ conn.setRequestProperty("Content-Type", "application/octet-stream");
|
|
|
+ }
|
|
|
+ conn.setDoOutput(op.getDoOutput());
|
|
|
+ conn.connect();
|
|
|
+ return conn;
|
|
|
+ } catch (IOException e) {
|
|
|
+ conn.disconnect();
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Two-step Create/Append:
|
|
|
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
+ * Step 2) Submit another Http request with the URL from the Location header with data.
|
|
|
+ *
|
|
|
+ * The reason of having two-step create/append is for preventing clients to
|
|
|
+ * send out the data before the redirect. This issue is addressed by the
|
|
|
+ * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
|
|
|
+ * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
|
|
|
+ * and Java 6 http client), which do not correctly implement "Expect:
|
|
|
+ * 100-continue". The two-step create/append is a temporary workaround for
|
|
|
+ * the software library bugs.
|
|
|
+ */
|
|
|
+ static HttpURLConnection twoStepWrite(HttpURLConnection conn,
|
|
|
+ final HttpOpParam.Op op) throws IOException {
|
|
|
+ //Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
+ conn.setInstanceFollowRedirects(false);
|
|
|
+ conn.setDoOutput(false);
|
|
|
+ conn.connect();
|
|
|
+ validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn);
|
|
|
+ final String redirect = conn.getHeaderField("Location");
|
|
|
+ conn.disconnect();
|
|
|
+
|
|
|
+ //Step 2) Submit another Http request with the URL from the Location header with data.
|
|
|
+ conn = (HttpURLConnection)new URL(redirect).openConnection();
|
|
|
+ conn.setRequestMethod(op.getType().toString());
|
|
|
+ conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
|
|
|
+ return conn;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Run a http operation.
|
|
@@ -409,158 +439,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
*/
|
|
|
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
|
|
|
final Param<?,?>... parameters) throws IOException {
|
|
|
- return new Runner(op, fspath, parameters).run().json;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This class is for initialing a HTTP connection, connecting to server,
|
|
|
- * obtaining a response, and also handling retry on failures.
|
|
|
- */
|
|
|
- class Runner {
|
|
|
- private final HttpOpParam.Op op;
|
|
|
- private final URL url;
|
|
|
- private final boolean redirected;
|
|
|
-
|
|
|
- private boolean checkRetry;
|
|
|
- private HttpURLConnection conn = null;
|
|
|
- private Map<?, ?> json = null;
|
|
|
-
|
|
|
- Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
|
|
|
- this.op = op;
|
|
|
- this.url = url;
|
|
|
- this.redirected = redirected;
|
|
|
- }
|
|
|
-
|
|
|
- Runner(final HttpOpParam.Op op, final Path fspath,
|
|
|
- final Param<?,?>... parameters) throws IOException {
|
|
|
- this(op, toUrl(op, fspath, parameters), false);
|
|
|
- }
|
|
|
-
|
|
|
- Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
|
|
|
- this(op, null, false);
|
|
|
- this.conn = conn;
|
|
|
- }
|
|
|
-
|
|
|
- private void init() throws IOException {
|
|
|
- checkRetry = !redirected;
|
|
|
- try {
|
|
|
- conn = getHttpUrlConnection(url);
|
|
|
- } catch(AuthenticationException ae) {
|
|
|
- checkRetry = false;
|
|
|
- throw new IOException("Authentication failed, url=" + url, ae);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void connect() throws IOException {
|
|
|
- connect(op.getDoOutput());
|
|
|
- }
|
|
|
-
|
|
|
- private void connect(boolean doOutput) throws IOException {
|
|
|
- conn.setRequestMethod(op.getType().toString());
|
|
|
- conn.setDoOutput(doOutput);
|
|
|
- conn.setInstanceFollowRedirects(false);
|
|
|
- conn.connect();
|
|
|
- }
|
|
|
-
|
|
|
- private void disconnect() {
|
|
|
- if (conn != null) {
|
|
|
- conn.disconnect();
|
|
|
- conn = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Runner run() throws IOException {
|
|
|
- for(int retry = 0; ; retry++) {
|
|
|
- try {
|
|
|
- init();
|
|
|
- if (op.getDoOutput()) {
|
|
|
- twoStepWrite();
|
|
|
- } else {
|
|
|
- getResponse(op != GetOpParam.Op.OPEN);
|
|
|
- }
|
|
|
- return this;
|
|
|
- } catch(IOException ioe) {
|
|
|
- shouldRetry(ioe, retry);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void shouldRetry(final IOException ioe, final int retry
|
|
|
- ) throws IOException {
|
|
|
- if (checkRetry) {
|
|
|
- try {
|
|
|
- final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
|
|
|
- ioe, retry, 0, true);
|
|
|
- if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
|
|
|
- LOG.info("Retrying connect to namenode: " + nnAddr
|
|
|
- + ". Already tried " + retry + " time(s); retry policy is "
|
|
|
- + retryPolicy + ", delay " + a.delayMillis + "ms.");
|
|
|
- Thread.sleep(a.delayMillis);
|
|
|
- return;
|
|
|
- }
|
|
|
- } catch(Exception e) {
|
|
|
- LOG.warn("Original exception is ", ioe);
|
|
|
- throw toIOException(e);
|
|
|
- }
|
|
|
- }
|
|
|
- throw toIOException(ioe);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Two-step Create/Append:
|
|
|
- * Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
- * Step 2) Submit another Http request with the URL from the Location header with data.
|
|
|
- *
|
|
|
- * The reason of having two-step create/append is for preventing clients to
|
|
|
- * send out the data before the redirect. This issue is addressed by the
|
|
|
- * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
|
|
|
- * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
|
|
|
- * and Java 6 http client), which do not correctly implement "Expect:
|
|
|
- * 100-continue". The two-step create/append is a temporary workaround for
|
|
|
- * the software library bugs.
|
|
|
- */
|
|
|
- HttpURLConnection twoStepWrite() throws IOException {
|
|
|
- //Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
|
- connect(false);
|
|
|
- validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
|
|
|
- final String redirect = conn.getHeaderField("Location");
|
|
|
- disconnect();
|
|
|
- checkRetry = false;
|
|
|
-
|
|
|
- //Step 2) Submit another Http request with the URL from the Location header with data.
|
|
|
- conn = (HttpURLConnection)new URL(redirect).openConnection();
|
|
|
- conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
|
|
|
- connect();
|
|
|
- return conn;
|
|
|
- }
|
|
|
-
|
|
|
- FSDataOutputStream write(final int bufferSize) throws IOException {
|
|
|
- return WebHdfsFileSystem.this.write(op, conn, bufferSize);
|
|
|
- }
|
|
|
-
|
|
|
- void getResponse(boolean getJsonAndDisconnect) throws IOException {
|
|
|
- try {
|
|
|
- connect();
|
|
|
- if (!redirected && op.getRedirect()) {
|
|
|
- final String redirect = conn.getHeaderField("Location");
|
|
|
- json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
|
|
|
- conn, false);
|
|
|
- disconnect();
|
|
|
-
|
|
|
- checkRetry = false;
|
|
|
- conn = (HttpURLConnection)new URL(redirect).openConnection();
|
|
|
- connect();
|
|
|
- }
|
|
|
-
|
|
|
- json = validateResponse(op, conn, false);
|
|
|
- if (json == null && getJsonAndDisconnect) {
|
|
|
- json = jsonParse(conn, false);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (getJsonAndDisconnect) {
|
|
|
- disconnect();
|
|
|
- }
|
|
|
- }
|
|
|
+ final HttpURLConnection conn = httpConnect(op, fspath, parameters);
|
|
|
+ try {
|
|
|
+ final Map<?, ?> m = validateResponse(op, conn);
|
|
|
+ return m != null? m: jsonParse(conn, false);
|
|
|
+ } finally {
|
|
|
+ conn.disconnect();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -694,7 +578,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
super.close();
|
|
|
} finally {
|
|
|
try {
|
|
|
- validateResponse(op, conn, true);
|
|
|
+ validateResponse(op, conn);
|
|
|
} finally {
|
|
|
conn.disconnect();
|
|
|
}
|
|
@@ -710,14 +594,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
statistics.incrementWriteOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
|
|
|
- return new Runner(op, f,
|
|
|
+ final HttpURLConnection conn = httpConnect(op, f,
|
|
|
new PermissionParam(applyUMask(permission)),
|
|
|
new OverwriteParam(overwrite),
|
|
|
new BufferSizeParam(bufferSize),
|
|
|
new ReplicationParam(replication),
|
|
|
- new BlockSizeParam(blockSize))
|
|
|
- .run()
|
|
|
- .write(bufferSize);
|
|
|
+ new BlockSizeParam(blockSize));
|
|
|
+ return write(op, conn, bufferSize);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -726,9 +609,9 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
statistics.incrementWriteOps(1);
|
|
|
|
|
|
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
|
|
|
- return new Runner(op, f, new BufferSizeParam(bufferSize))
|
|
|
- .run()
|
|
|
- .write(bufferSize);
|
|
|
+ final HttpURLConnection conn = httpConnect(op, f,
|
|
|
+ new BufferSizeParam(bufferSize));
|
|
|
+ return write(op, conn, bufferSize);
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
@@ -755,17 +638,26 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
}
|
|
|
|
|
|
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|
|
|
+ /** The url with offset parameter */
|
|
|
+ private URL offsetUrl;
|
|
|
+
|
|
|
OffsetUrlOpener(final URL url) {
|
|
|
super(url);
|
|
|
}
|
|
|
|
|
|
- /** Setup offset url and connect. */
|
|
|
+ /** Open connection with offset url. */
|
|
|
+ @Override
|
|
|
+ protected HttpURLConnection openConnection() throws IOException {
|
|
|
+ return getHttpUrlConnection(offsetUrl);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Setup offset url before open connection. */
|
|
|
@Override
|
|
|
- protected HttpURLConnection connect(final long offset,
|
|
|
- final boolean resolved) throws IOException {
|
|
|
- final URL offsetUrl = offset == 0L? url
|
|
|
- : new URL(url + "&" + new OffsetParam(offset));
|
|
|
- return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
|
|
|
+ protected HttpURLConnection openConnection(final long offset) throws IOException {
|
|
|
+ offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
|
|
+ final HttpURLConnection conn = openConnection();
|
|
|
+ conn.setRequestMethod("GET");
|
|
|
+ return conn;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -806,6 +698,12 @@ public class WebHdfsFileSystem extends FileSystem
|
|
|
OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) {
|
|
|
super(o, r);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void checkResponseCode(final HttpURLConnection connection
|
|
|
+ ) throws IOException {
|
|
|
+ validateResponse(GetOpParam.Op.OPEN, connection);
|
|
|
+ }
|
|
|
|
|
|
/** Remove offset parameter before returning the resolved url. */
|
|
|
@Override
|