|
@@ -19,6 +19,7 @@ package org.apache.hadoop.fs.http.client;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.ContentSummary;
|
|
import org.apache.hadoop.fs.ContentSummary;
|
|
|
|
+import org.apache.hadoop.fs.DelegationTokenRenewer;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FileChecksum;
|
|
import org.apache.hadoop.fs.FileChecksum;
|
|
@@ -28,16 +29,18 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.PositionedReadable;
|
|
import org.apache.hadoop.fs.PositionedReadable;
|
|
import org.apache.hadoop.fs.Seekable;
|
|
import org.apache.hadoop.fs.Seekable;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
import org.apache.hadoop.security.authentication.client.Authenticator;
|
|
import org.apache.hadoop.security.authentication.client.Authenticator;
|
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.json.simple.JSONArray;
|
|
import org.json.simple.JSONArray;
|
|
import org.json.simple.JSONObject;
|
|
import org.json.simple.JSONObject;
|
|
-import org.json.simple.parser.JSONParser;
|
|
|
|
-import org.json.simple.parser.ParseException;
|
|
|
|
|
|
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.BufferedOutputStream;
|
|
@@ -47,30 +50,32 @@ import java.io.FileNotFoundException;
|
|
import java.io.FilterInputStream;
|
|
import java.io.FilterInputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
-import java.io.InputStreamReader;
|
|
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
-import java.lang.reflect.Constructor;
|
|
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
-import java.net.URLEncoder;
|
|
|
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
import java.text.MessageFormat;
|
|
import java.text.MessageFormat;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.Callable;
|
|
|
|
|
|
/**
|
|
/**
|
|
* HttpFSServer implementation of the FileSystemAccess FileSystem.
|
|
* HttpFSServer implementation of the FileSystemAccess FileSystem.
|
|
* <p/>
|
|
* <p/>
|
|
* This implementation allows a user to access HDFS over HTTP via a HttpFSServer server.
|
|
* This implementation allows a user to access HDFS over HTTP via a HttpFSServer server.
|
|
*/
|
|
*/
|
|
-public class HttpFSFileSystem extends FileSystem {
|
|
|
|
|
|
+public class HttpFSFileSystem extends FileSystem
|
|
|
|
+ implements DelegationTokenRenewer.Renewable {
|
|
|
|
|
|
- public static final String SERVICE_NAME = "/webhdfs";
|
|
|
|
|
|
+ public static final String SERVICE_NAME = HttpFSUtils.SERVICE_NAME;
|
|
|
|
|
|
- public static final String SERVICE_VERSION = "/v1";
|
|
|
|
|
|
+ public static final String SERVICE_VERSION = HttpFSUtils.SERVICE_VERSION;
|
|
|
|
|
|
- public static final String SERVICE_PREFIX = SERVICE_NAME + SERVICE_VERSION;
|
|
|
|
|
|
+ public static final String SCHEME = "webhdfs";
|
|
|
|
|
|
public static final String OP_PARAM = "op";
|
|
public static final String OP_PARAM = "op";
|
|
public static final String DO_AS_PARAM = "doas";
|
|
public static final String DO_AS_PARAM = "doas";
|
|
@@ -84,7 +89,6 @@ public class HttpFSFileSystem extends FileSystem {
|
|
public static final String GROUP_PARAM = "group";
|
|
public static final String GROUP_PARAM = "group";
|
|
public static final String MODIFICATION_TIME_PARAM = "modificationtime";
|
|
public static final String MODIFICATION_TIME_PARAM = "modificationtime";
|
|
public static final String ACCESS_TIME_PARAM = "accesstime";
|
|
public static final String ACCESS_TIME_PARAM = "accesstime";
|
|
- public static final String RENEWER_PARAM = "renewer";
|
|
|
|
|
|
|
|
public static final Short DEFAULT_PERMISSION = 0755;
|
|
public static final Short DEFAULT_PERMISSION = 0755;
|
|
|
|
|
|
@@ -144,9 +148,6 @@ public class HttpFSFileSystem extends FileSystem {
|
|
public static final String CONTENT_SUMMARY_SPACE_CONSUMED_JSON = "spaceConsumed";
|
|
public static final String CONTENT_SUMMARY_SPACE_CONSUMED_JSON = "spaceConsumed";
|
|
public static final String CONTENT_SUMMARY_SPACE_QUOTA_JSON = "spaceQuota";
|
|
public static final String CONTENT_SUMMARY_SPACE_QUOTA_JSON = "spaceQuota";
|
|
|
|
|
|
- public static final String DELEGATION_TOKEN_JSON = "Token";
|
|
|
|
- public static final String DELEGATION_TOKEN_URL_STRING_JSON = "urlString";
|
|
|
|
-
|
|
|
|
public static final String ERROR_JSON = "RemoteException";
|
|
public static final String ERROR_JSON = "RemoteException";
|
|
public static final String ERROR_EXCEPTION_JSON = "exception";
|
|
public static final String ERROR_EXCEPTION_JSON = "exception";
|
|
public static final String ERROR_CLASSNAME_JSON = "javaClassName";
|
|
public static final String ERROR_CLASSNAME_JSON = "javaClassName";
|
|
@@ -184,8 +185,31 @@ public class HttpFSFileSystem extends FileSystem {
|
|
|
|
|
|
private AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
|
private AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
|
private URI uri;
|
|
private URI uri;
|
|
|
|
+ private InetSocketAddress httpFSAddr;
|
|
private Path workingDir;
|
|
private Path workingDir;
|
|
|
|
+ private UserGroupInformation realUser;
|
|
private String doAs;
|
|
private String doAs;
|
|
|
|
+ private Token<?> delegationToken;
|
|
|
|
+
|
|
|
|
+ //This method enables handling UGI doAs with SPNEGO, we have to
|
|
|
|
+ //fallback to the realuser who logged in with Kerberos credentials
|
|
|
|
+ private <T> T doAsRealUserIfNecessary(final Callable<T> callable)
|
|
|
|
+ throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ if (realUser.getShortUserName().equals(doAs)) {
|
|
|
|
+ return callable.call();
|
|
|
|
+ } else {
|
|
|
|
+ return realUser.doAs(new PrivilegedExceptionAction<T>() {
|
|
|
|
+ @Override
|
|
|
|
+ public T run() throws Exception {
|
|
|
|
+ return callable.call();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ throw new IOException(ex.toString(), ex);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Convenience method that creates a <code>HttpURLConnection</code> for the
|
|
* Convenience method that creates a <code>HttpURLConnection</code> for the
|
|
@@ -204,25 +228,23 @@ public class HttpFSFileSystem extends FileSystem {
|
|
*
|
|
*
|
|
* @throws IOException thrown if an IO error occurrs.
|
|
* @throws IOException thrown if an IO error occurrs.
|
|
*/
|
|
*/
|
|
- private HttpURLConnection getConnection(String method, Map<String, String> params,
|
|
|
|
- Path path, boolean makeQualified) throws IOException {
|
|
|
|
- params.put(DO_AS_PARAM, doAs);
|
|
|
|
|
|
+ private HttpURLConnection getConnection(final String method,
|
|
|
|
+ Map<String, String> params, Path path, boolean makeQualified)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (!realUser.getShortUserName().equals(doAs)) {
|
|
|
|
+ params.put(DO_AS_PARAM, doAs);
|
|
|
|
+ }
|
|
|
|
+ HttpFSKerberosAuthenticator.injectDelegationToken(params, delegationToken);
|
|
if (makeQualified) {
|
|
if (makeQualified) {
|
|
path = makeQualified(path);
|
|
path = makeQualified(path);
|
|
}
|
|
}
|
|
- URI uri = path.toUri();
|
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
|
- sb.append(uri.getScheme()).append("://").append(uri.getAuthority()).
|
|
|
|
- append(SERVICE_PREFIX).append(uri.getPath());
|
|
|
|
-
|
|
|
|
- String separator = "?";
|
|
|
|
- for (Map.Entry<String, String> entry : params.entrySet()) {
|
|
|
|
- sb.append(separator).append(entry.getKey()).append("=").
|
|
|
|
- append(URLEncoder.encode(entry.getValue(), "UTF8"));
|
|
|
|
- separator = "&";
|
|
|
|
- }
|
|
|
|
- URL url = new URL(sb.toString());
|
|
|
|
- return getConnection(url, method);
|
|
|
|
|
|
+ final URL url = HttpFSUtils.createHttpURL(path, params);
|
|
|
|
+ return doAsRealUserIfNecessary(new Callable<HttpURLConnection>() {
|
|
|
|
+ @Override
|
|
|
|
+ public HttpURLConnection call() throws Exception {
|
|
|
|
+ return getConnection(url, method);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -240,7 +262,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
*/
|
|
*/
|
|
private HttpURLConnection getConnection(URL url, String method) throws IOException {
|
|
private HttpURLConnection getConnection(URL url, String method) throws IOException {
|
|
Class<? extends Authenticator> klass =
|
|
Class<? extends Authenticator> klass =
|
|
- getConf().getClass("httpfs.authenticator.class", HttpKerberosAuthenticator.class, Authenticator.class);
|
|
|
|
|
|
+ getConf().getClass("httpfs.authenticator.class",
|
|
|
|
+ HttpFSKerberosAuthenticator.class, Authenticator.class);
|
|
Authenticator authenticator = ReflectionUtils.newInstance(klass, getConf());
|
|
Authenticator authenticator = ReflectionUtils.newInstance(klass, getConf());
|
|
try {
|
|
try {
|
|
HttpURLConnection conn = new AuthenticatedURL(authenticator).openConnection(url, authToken);
|
|
HttpURLConnection conn = new AuthenticatedURL(authenticator).openConnection(url, authToken);
|
|
@@ -254,63 +277,6 @@ public class HttpFSFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Convenience method that JSON Parses the <code>InputStream</code> of a <code>HttpURLConnection</code>.
|
|
|
|
- *
|
|
|
|
- * @param conn the <code>HttpURLConnection</code>.
|
|
|
|
- *
|
|
|
|
- * @return the parsed JSON object.
|
|
|
|
- *
|
|
|
|
- * @throws IOException thrown if the <code>InputStream</code> could not be JSON parsed.
|
|
|
|
- */
|
|
|
|
- private static Object jsonParse(HttpURLConnection conn) throws IOException {
|
|
|
|
- try {
|
|
|
|
- JSONParser parser = new JSONParser();
|
|
|
|
- return parser.parse(new InputStreamReader(conn.getInputStream()));
|
|
|
|
- } catch (ParseException ex) {
|
|
|
|
- throw new IOException("JSON parser error, " + ex.getMessage(), ex);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Validates the status of an <code>HttpURLConnection</code> against an expected HTTP
|
|
|
|
- * status code. If the current status code is not the expected one it throws an exception
|
|
|
|
- * with a detail message using Server side error messages if available.
|
|
|
|
- *
|
|
|
|
- * @param conn the <code>HttpURLConnection</code>.
|
|
|
|
- * @param expected the expected HTTP status code.
|
|
|
|
- *
|
|
|
|
- * @throws IOException thrown if the current status code does not match the expected one.
|
|
|
|
- */
|
|
|
|
- private static void validateResponse(HttpURLConnection conn, int expected) throws IOException {
|
|
|
|
- int status = conn.getResponseCode();
|
|
|
|
- if (status != expected) {
|
|
|
|
- try {
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
- json = (JSONObject) json.get(ERROR_JSON);
|
|
|
|
- String message = (String) json.get(ERROR_MESSAGE_JSON);
|
|
|
|
- String exception = (String) json.get(ERROR_EXCEPTION_JSON);
|
|
|
|
- String className = (String) json.get(ERROR_CLASSNAME_JSON);
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- ClassLoader cl = HttpFSFileSystem.class.getClassLoader();
|
|
|
|
- Class klass = cl.loadClass(className);
|
|
|
|
- Constructor constr = klass.getConstructor(String.class);
|
|
|
|
- throw (IOException) constr.newInstance(message);
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- throw ex;
|
|
|
|
- } catch (Exception ex) {
|
|
|
|
- throw new IOException(MessageFormat.format("{0} - {1}", exception, message));
|
|
|
|
- }
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- if (ex.getCause() instanceof IOException) {
|
|
|
|
- throw (IOException) ex.getCause();
|
|
|
|
- }
|
|
|
|
- throw new IOException(MessageFormat.format("HTTP status [{0}], {1}", status, conn.getResponseMessage()));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Called after a new FileSystem instance is constructed.
|
|
* Called after a new FileSystem instance is constructed.
|
|
*
|
|
*
|
|
@@ -320,15 +286,28 @@ public class HttpFSFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void initialize(URI name, Configuration conf) throws IOException {
|
|
public void initialize(URI name, Configuration conf) throws IOException {
|
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
- doAs = ugi.getUserName();
|
|
|
|
|
|
+
|
|
|
|
+ //the real use is the one that has the Kerberos credentials needed for
|
|
|
|
+ //SPNEGO to work
|
|
|
|
+ realUser = ugi.getRealUser();
|
|
|
|
+ if (realUser == null) {
|
|
|
|
+ realUser = UserGroupInformation.getLoginUser();
|
|
|
|
+ }
|
|
|
|
+ doAs = ugi.getShortUserName();
|
|
super.initialize(name, conf);
|
|
super.initialize(name, conf);
|
|
try {
|
|
try {
|
|
- uri = new URI(name.getScheme() + "://" + name.getHost() + ":" + name.getPort());
|
|
|
|
|
|
+ uri = new URI(name.getScheme() + "://" + name.getAuthority());
|
|
|
|
+ httpFSAddr = NetUtils.createSocketAddr(getCanonicalUri().toString());
|
|
} catch (URISyntaxException ex) {
|
|
} catch (URISyntaxException ex) {
|
|
throw new IOException(ex);
|
|
throw new IOException(ex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public String getScheme() {
|
|
|
|
+ return SCHEME;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Returns a URI whose scheme and authority identify this FileSystem.
|
|
* Returns a URI whose scheme and authority identify this FileSystem.
|
|
*
|
|
*
|
|
@@ -339,6 +318,16 @@ public class HttpFSFileSystem extends FileSystem {
|
|
return uri;
|
|
return uri;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the default port for this file system.
|
|
|
|
+ * @return the default port or 0 if there isn't one
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ protected int getDefaultPort() {
|
|
|
|
+ return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* HttpFSServer subclass of the <code>FSDataInputStream</code>.
|
|
* HttpFSServer subclass of the <code>FSDataInputStream</code>.
|
|
* <p/>
|
|
* <p/>
|
|
@@ -397,7 +386,7 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(OP_PARAM, Operation.OPEN.toString());
|
|
params.put(OP_PARAM, Operation.OPEN.toString());
|
|
HttpURLConnection conn = getConnection(Operation.OPEN.getMethod(), params,
|
|
HttpURLConnection conn = getConnection(Operation.OPEN.getMethod(), params,
|
|
f, true);
|
|
f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
return new FSDataInputStream(
|
|
return new FSDataInputStream(
|
|
new HttpFSDataInputStream(conn.getInputStream(), bufferSize));
|
|
new HttpFSDataInputStream(conn.getInputStream(), bufferSize));
|
|
}
|
|
}
|
|
@@ -424,7 +413,7 @@ public class HttpFSFileSystem extends FileSystem {
|
|
try {
|
|
try {
|
|
super.close();
|
|
super.close();
|
|
} finally {
|
|
} finally {
|
|
- validateResponse(conn, closeStatus);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, closeStatus);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -460,11 +449,11 @@ public class HttpFSFileSystem extends FileSystem {
|
|
OutputStream os = new BufferedOutputStream(conn.getOutputStream(), bufferSize);
|
|
OutputStream os = new BufferedOutputStream(conn.getOutputStream(), bufferSize);
|
|
return new HttpFSDataOutputStream(conn, os, expectedStatus, statistics);
|
|
return new HttpFSDataOutputStream(conn, os, expectedStatus, statistics);
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
- validateResponse(conn, expectedStatus);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, expectedStatus);
|
|
throw ex;
|
|
throw ex;
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- validateResponse(conn, HTTP_TEMPORARY_REDIRECT);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HTTP_TEMPORARY_REDIRECT);
|
|
throw new IOException("Missing HTTP 'Location' header for [" + conn.getURL() + "]");
|
|
throw new IOException("Missing HTTP 'Location' header for [" + conn.getURL() + "]");
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
@@ -476,7 +465,7 @@ public class HttpFSFileSystem extends FileSystem {
|
|
if (exceptionAlreadyHandled) {
|
|
if (exceptionAlreadyHandled) {
|
|
throw ex;
|
|
throw ex;
|
|
} else {
|
|
} else {
|
|
- validateResponse(conn, HTTP_TEMPORARY_REDIRECT);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HTTP_TEMPORARY_REDIRECT);
|
|
throw ex;
|
|
throw ex;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -548,8 +537,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(DESTINATION_PARAM, dst.toString());
|
|
params.put(DESTINATION_PARAM, dst.toString());
|
|
HttpURLConnection conn = getConnection(Operation.RENAME.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.RENAME.getMethod(),
|
|
params, src, true);
|
|
params, src, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
return (Boolean) json.get(RENAME_JSON);
|
|
return (Boolean) json.get(RENAME_JSON);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -584,8 +573,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(RECURSIVE_PARAM, Boolean.toString(recursive));
|
|
params.put(RECURSIVE_PARAM, Boolean.toString(recursive));
|
|
HttpURLConnection conn = getConnection(Operation.DELETE.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.DELETE.getMethod(),
|
|
params, f, true);
|
|
params, f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
return (Boolean) json.get(DELETE_JSON);
|
|
return (Boolean) json.get(DELETE_JSON);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -605,8 +594,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(OP_PARAM, Operation.LISTSTATUS.toString());
|
|
params.put(OP_PARAM, Operation.LISTSTATUS.toString());
|
|
HttpURLConnection conn = getConnection(Operation.LISTSTATUS.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.LISTSTATUS.getMethod(),
|
|
params, f, true);
|
|
params, f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
json = (JSONObject) json.get(FILE_STATUSES_JSON);
|
|
json = (JSONObject) json.get(FILE_STATUSES_JSON);
|
|
JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
|
|
JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
|
|
FileStatus[] array = new FileStatus[jsonArray.size()];
|
|
FileStatus[] array = new FileStatus[jsonArray.size()];
|
|
@@ -653,8 +642,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(PERMISSION_PARAM, permissionToString(permission));
|
|
params.put(PERMISSION_PARAM, permissionToString(permission));
|
|
HttpURLConnection conn = getConnection(Operation.MKDIRS.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.MKDIRS.getMethod(),
|
|
params, f, true);
|
|
params, f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
return (Boolean) json.get(MKDIRS_JSON);
|
|
return (Boolean) json.get(MKDIRS_JSON);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -674,8 +663,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(OP_PARAM, Operation.GETFILESTATUS.toString());
|
|
params.put(OP_PARAM, Operation.GETFILESTATUS.toString());
|
|
HttpURLConnection conn = getConnection(Operation.GETFILESTATUS.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.GETFILESTATUS.getMethod(),
|
|
params, f, true);
|
|
params, f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
json = (JSONObject) json.get(FILE_STATUS_JSON);
|
|
json = (JSONObject) json.get(FILE_STATUS_JSON);
|
|
f = makeQualified(f);
|
|
f = makeQualified(f);
|
|
return createFileStatus(f, json);
|
|
return createFileStatus(f, json);
|
|
@@ -693,8 +682,8 @@ public class HttpFSFileSystem extends FileSystem {
|
|
HttpURLConnection conn =
|
|
HttpURLConnection conn =
|
|
getConnection(Operation.GETHOMEDIRECTORY.getMethod(), params,
|
|
getConnection(Operation.GETHOMEDIRECTORY.getMethod(), params,
|
|
new Path(getUri().toString(), "/"), false);
|
|
new Path(getUri().toString(), "/"), false);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
return new Path((String) json.get(HOME_DIR_JSON));
|
|
return new Path((String) json.get(HOME_DIR_JSON));
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
throw new RuntimeException(ex);
|
|
throw new RuntimeException(ex);
|
|
@@ -718,7 +707,7 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(GROUP_PARAM, groupname);
|
|
params.put(GROUP_PARAM, groupname);
|
|
HttpURLConnection conn = getConnection(Operation.SETOWNER.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.SETOWNER.getMethod(),
|
|
params, p, true);
|
|
params, p, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -733,7 +722,7 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(OP_PARAM, Operation.SETPERMISSION.toString());
|
|
params.put(OP_PARAM, Operation.SETPERMISSION.toString());
|
|
params.put(PERMISSION_PARAM, permissionToString(permission));
|
|
params.put(PERMISSION_PARAM, permissionToString(permission));
|
|
HttpURLConnection conn = getConnection(Operation.SETPERMISSION.getMethod(), params, p, true);
|
|
HttpURLConnection conn = getConnection(Operation.SETPERMISSION.getMethod(), params, p, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -755,7 +744,7 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(ACCESS_TIME_PARAM, Long.toString(atime));
|
|
params.put(ACCESS_TIME_PARAM, Long.toString(atime));
|
|
HttpURLConnection conn = getConnection(Operation.SETTIMES.getMethod(),
|
|
HttpURLConnection conn = getConnection(Operation.SETTIMES.getMethod(),
|
|
params, p, true);
|
|
params, p, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -777,19 +766,11 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(REPLICATION_PARAM, Short.toString(replication));
|
|
params.put(REPLICATION_PARAM, Short.toString(replication));
|
|
HttpURLConnection conn =
|
|
HttpURLConnection conn =
|
|
getConnection(Operation.SETREPLICATION.getMethod(), params, src, true);
|
|
getConnection(Operation.SETREPLICATION.getMethod(), params, src, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json = (JSONObject) jsonParse(conn);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
|
return (Boolean) json.get(SET_REPLICATION_JSON);
|
|
return (Boolean) json.get(SET_REPLICATION_JSON);
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Creates a <code>FileStatus</code> object using a JSON file-status payload
|
|
|
|
- * received from a HttpFSServer server.
|
|
|
|
- *
|
|
|
|
- * @param json a JSON file-status payload received from a HttpFSServer server
|
|
|
|
- *
|
|
|
|
- * @return the corresponding <code>FileStatus</code>
|
|
|
|
- */
|
|
|
|
private FileStatus createFileStatus(Path parent, JSONObject json) {
|
|
private FileStatus createFileStatus(Path parent, JSONObject json) {
|
|
String pathSuffix = (String) json.get(PATH_SUFFIX_JSON);
|
|
String pathSuffix = (String) json.get(PATH_SUFFIX_JSON);
|
|
Path path = (pathSuffix.equals("")) ? parent : new Path(parent, pathSuffix);
|
|
Path path = (pathSuffix.equals("")) ? parent : new Path(parent, pathSuffix);
|
|
@@ -828,9 +809,9 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(OP_PARAM, Operation.GETCONTENTSUMMARY.toString());
|
|
params.put(OP_PARAM, Operation.GETCONTENTSUMMARY.toString());
|
|
HttpURLConnection conn =
|
|
HttpURLConnection conn =
|
|
getConnection(Operation.GETCONTENTSUMMARY.getMethod(), params, f, true);
|
|
getConnection(Operation.GETCONTENTSUMMARY.getMethod(), params, f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- JSONObject json =
|
|
|
|
- (JSONObject) ((JSONObject) jsonParse(conn)).get(CONTENT_SUMMARY_JSON);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ JSONObject json = (JSONObject) ((JSONObject)
|
|
|
|
+ HttpFSUtils.jsonParse(conn)).get(CONTENT_SUMMARY_JSON);
|
|
return new ContentSummary((Long) json.get(CONTENT_SUMMARY_LENGTH_JSON),
|
|
return new ContentSummary((Long) json.get(CONTENT_SUMMARY_LENGTH_JSON),
|
|
(Long) json.get(CONTENT_SUMMARY_FILE_COUNT_JSON),
|
|
(Long) json.get(CONTENT_SUMMARY_FILE_COUNT_JSON),
|
|
(Long) json.get(CONTENT_SUMMARY_DIRECTORY_COUNT_JSON),
|
|
(Long) json.get(CONTENT_SUMMARY_DIRECTORY_COUNT_JSON),
|
|
@@ -846,9 +827,9 @@ public class HttpFSFileSystem extends FileSystem {
|
|
params.put(OP_PARAM, Operation.GETFILECHECKSUM.toString());
|
|
params.put(OP_PARAM, Operation.GETFILECHECKSUM.toString());
|
|
HttpURLConnection conn =
|
|
HttpURLConnection conn =
|
|
getConnection(Operation.GETFILECHECKSUM.getMethod(), params, f, true);
|
|
getConnection(Operation.GETFILECHECKSUM.getMethod(), params, f, true);
|
|
- validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
- final JSONObject json =
|
|
|
|
- (JSONObject) ((JSONObject) jsonParse(conn)).get(FILE_CHECKSUM_JSON);
|
|
|
|
|
|
+ HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
|
|
|
+ final JSONObject json = (JSONObject) ((JSONObject)
|
|
|
|
+ HttpFSUtils.jsonParse(conn)).get(FILE_CHECKSUM_JSON);
|
|
return new FileChecksum() {
|
|
return new FileChecksum() {
|
|
@Override
|
|
@Override
|
|
public String getAlgorithmName() {
|
|
public String getAlgorithmName() {
|
|
@@ -877,4 +858,56 @@ public class HttpFSFileSystem extends FileSystem {
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
|
|
+ public Token<?> getDelegationToken(final String renewer)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return doAsRealUserIfNecessary(new Callable<Token<?>>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Token<?> call() throws Exception {
|
|
|
|
+ return HttpFSKerberosAuthenticator.
|
|
|
|
+ getDelegationToken(uri, httpFSAddr, authToken, renewer);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public List<Token<?>> getDelegationTokens(final String renewer)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return doAsRealUserIfNecessary(new Callable<List<Token<?>>>() {
|
|
|
|
+ @Override
|
|
|
|
+ public List<Token<?>> call() throws Exception {
|
|
|
|
+ return HttpFSKerberosAuthenticator.
|
|
|
|
+ getDelegationTokens(uri, httpFSAddr, authToken, renewer);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public long renewDelegationToken(final Token<?> token) throws IOException {
|
|
|
|
+ return doAsRealUserIfNecessary(new Callable<Long>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Long call() throws Exception {
|
|
|
|
+ return HttpFSKerberosAuthenticator.
|
|
|
|
+ renewDelegationToken(uri, authToken, token);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void cancelDelegationToken(final Token<?> token) throws IOException {
|
|
|
|
+ HttpFSKerberosAuthenticator.
|
|
|
|
+ cancelDelegationToken(uri, authToken, token);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Token<?> getRenewToken() {
|
|
|
|
+ return delegationToken;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
|
|
|
+ delegationToken = token;
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|