|
@@ -26,10 +26,11 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.EnumSet;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Random;
|
|
|
import java.util.TimeZone;
|
|
|
|
|
@@ -37,7 +38,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.ContentSummary;
|
|
|
-import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileChecksum;
|
|
@@ -46,9 +46,15 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
|
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.xml.sax.Attributes;
|
|
|
import org.xml.sax.InputSource;
|
|
@@ -78,6 +84,9 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
|
+ private Token<? extends TokenIdentifier> delegationToken;
|
|
|
+ public static final String HFTP_RENEWER = "fs.hftp.renewer";
|
|
|
+ public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
|
|
|
|
|
|
public static final SimpleDateFormat getDateFormat() {
|
|
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
|
@@ -93,13 +102,74 @@ public class HftpFileSystem extends FileSystem {
|
|
|
};
|
|
|
|
|
|
@Override
|
|
|
- public void initialize(URI name, Configuration conf) throws IOException {
|
|
|
+ public void initialize(final URI name, final Configuration conf)
|
|
|
+ throws IOException {
|
|
|
super.initialize(name, conf);
|
|
|
setConf(conf);
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
nnAddr = NetUtils.createSocketAddr(name.toString());
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ StringBuffer sb = new StringBuffer(HFTP_SERVICE_NAME_KEY);
|
|
|
+ // configuration has the actual service name for this url. Build the key
|
|
|
+ // and get it.
|
|
|
+ final String key = sb.append(NetUtils.normalizeHostName(name.getHost())).
|
|
|
+ append(".").append(name.getPort()).toString();
|
|
|
+
|
|
|
+ LOG.debug("Trying to find DT for " + name + " using key=" + key + "; conf=" + conf.get(key, ""));
|
|
|
+ Text nnServiceNameText = new Text(conf.get(key, ""));
|
|
|
+
|
|
|
+ Collection<Token<? extends TokenIdentifier>> tokens =
|
|
|
+ ugi.getTokens();
|
|
|
+ //try finding a token for this namenode (esp applicable for tasks
|
|
|
+ //using hftp). If there exists one, just set the delegationField
|
|
|
+ for (Token<? extends TokenIdentifier> t : tokens) {
|
|
|
+ if ((t.getService()).equals(nnServiceNameText)) {
|
|
|
+ LOG.debug("Found existing DT for " + name);
|
|
|
+ delegationToken = t;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //since we don't already have a token, go get one over https
|
|
|
+ try {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ public Object run() throws IOException {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ //try https (on http we NEVER get a delegation token)
|
|
|
+ String nnHttpUrl = "https://" +
|
|
|
+ (sb.append(NetUtils.normalizeHostName(name.getHost()))
|
|
|
+ .append(":").append(conf.getInt("dfs.https.port", 50470))).
|
|
|
+ toString();
|
|
|
+ Credentials c;
|
|
|
+ try {
|
|
|
+ c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl,
|
|
|
+ conf.get(HFTP_RENEWER));
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
|
|
|
+ " using https.");
|
|
|
+ //Maybe the server is in unsecure mode (that's bad but okay)
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
|
|
|
+ //the service field is already set and so setService
|
|
|
+ //is not required
|
|
|
+ delegationToken = t;
|
|
|
+ LOG.debug("Got dt for " + getUri() + ";t.service="
|
|
|
+ +t.getService());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ public Token<? extends TokenIdentifier> getDelegationToken() {
|
|
|
+ return delegationToken;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public URI getUri() {
|
|
@@ -116,7 +186,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
Construct URL pointing to file on namenode
|
|
|
*/
|
|
|
URL getNamenodeFileURL(Path f) throws IOException {
|
|
|
- return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi.getShortUserName());
|
|
|
+ return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + getUgiParameter());
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -135,6 +205,20 @@ public class HftpFileSystem extends FileSystem {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * ugi parameter for http connection
|
|
|
+ *
|
|
|
+ * @return user_shortname,group1,group2...
|
|
|
+ */
|
|
|
+ private String getUgiParameter() {
|
|
|
+ StringBuilder ugiParamenter = new StringBuilder(ugi.getShortUserName());
|
|
|
+ for(String g: ugi.getGroupNames()) {
|
|
|
+ ugiParamenter.append(",");
|
|
|
+ ugiParamenter.append(g);
|
|
|
+ }
|
|
|
+ return ugiParamenter.toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Open an HTTP connection to the namenode to read file data and metadata.
|
|
|
* @param path The path component of the URL
|
|
@@ -142,6 +226,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
*/
|
|
|
protected HttpURLConnection openConnection(String path, String query)
|
|
|
throws IOException {
|
|
|
+ query = updateQuery(query);
|
|
|
final URL url = getNamenodeURL(path, query);
|
|
|
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
|
|
|
connection.setRequestMethod("GET");
|
|
@@ -149,9 +234,21 @@ public class HftpFileSystem extends FileSystem {
|
|
|
return connection;
|
|
|
}
|
|
|
|
|
|
+ protected String updateQuery(String query) throws IOException {
|
|
|
+ String tokenString = null;
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ if (delegationToken != null) {
|
|
|
+ tokenString = delegationToken.encodeToUrlString();
|
|
|
+ return (query + JspHelper.SET_DELEGATION + tokenString);
|
|
|
+ } // else we are talking to an unsecure cluster
|
|
|
+ }
|
|
|
+ return query;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public FSDataInputStream open(Path f, int buffersize) throws IOException {
|
|
|
- URL u = getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi.getShortUserName());
|
|
|
+ URL u = getNamenodeURL("/data" + f.toUri().getPath(),
|
|
|
+ "ugi=" + getUgiParameter());
|
|
|
return new FSDataInputStream(new ByteRangeInputStream(u));
|
|
|
}
|
|
|
|
|
@@ -201,7 +298,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
XMLReader xr = XMLReaderFactory.createXMLReader();
|
|
|
xr.setContentHandler(this);
|
|
|
HttpURLConnection connection = openConnection("/listPaths" + path,
|
|
|
- "ugi=" + ugi.getShortUserName() + (recur? "&recursive=yes" : ""));
|
|
|
+ "ugi=" + getUgiParameter() + (recur? "&recursive=yes" : ""));
|
|
|
|
|
|
InputStream resp = connection.getInputStream();
|
|
|
xr.parse(new InputSource(resp));
|
|
@@ -265,7 +362,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
private FileChecksum getFileChecksum(String f) throws IOException {
|
|
|
final HttpURLConnection connection = openConnection(
|
|
|
- "/fileChecksum" + f, "ugi=" + ugi.getShortUserName());
|
|
|
+ "/fileChecksum" + f, "ugi=" + getUgiParameter());
|
|
|
try {
|
|
|
final XMLReader xr = XMLReaderFactory.createXMLReader();
|
|
|
xr.setContentHandler(this);
|
|
@@ -352,7 +449,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
*/
|
|
|
private ContentSummary getContentSummary(String path) throws IOException {
|
|
|
final HttpURLConnection connection = openConnection(
|
|
|
- "/contentSummary" + path, "ugi=" + ugi);
|
|
|
+ "/contentSummary" + path, "ugi=" + getUgiParameter());
|
|
|
InputStream in = null;
|
|
|
try {
|
|
|
in = connection.getInputStream();
|