|
@@ -23,16 +23,17 @@ import java.io.InputStream;
|
|
|
import java.io.IOException;
|
|
|
|
|
|
import java.net.HttpURLConnection;
|
|
|
-import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.net.URL;
|
|
|
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Random;
|
|
|
import java.util.TimeZone;
|
|
|
|
|
@@ -55,9 +56,14 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.JspHelper;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
|
|
|
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.security.*;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
/** An implementation of a protocol for accessing filesystems over HTTP.
|
|
@@ -76,6 +82,8 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
|
+ private Token<? extends TokenIdentifier> delegationToken;
|
|
|
+ public static final String HFTP_RENEWER = "fs.hftp.renewer";
|
|
|
|
|
|
public static final SimpleDateFormat getDateFormat() {
|
|
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
|
@@ -91,14 +99,64 @@ public class HftpFileSystem extends FileSystem {
|
|
|
};
|
|
|
|
|
|
@Override
|
|
|
- public void initialize(URI name, Configuration conf) throws IOException {
|
|
|
+ public void initialize(URI name, final Configuration conf) throws IOException {
|
|
|
super.initialize(name, conf);
|
|
|
setConf(conf);
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
|
|
nnAddr = NetUtils.createSocketAddr(name.toString());
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ final String nnServiceName =
|
|
|
+ (sb.append(NetUtils.normalizeHostName(name.getHost()))
|
|
|
+ .append(":").append(name.getPort())).toString();
|
|
|
+ Text nnServiceNameText = new Text(nnServiceName);
|
|
|
+ Collection<Token<? extends TokenIdentifier>> tokens =
|
|
|
+ ugi.getTokens();
|
|
|
+ //try finding a token for this namenode (esp applicable for tasks
|
|
|
+ //using hftp). If there exists one, just set the delegationField
|
|
|
+ for (Token<? extends TokenIdentifier> t : tokens) {
|
|
|
+ if ((t.getService()).equals(nnServiceNameText)) {
|
|
|
+ delegationToken = t;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //since we don't already have a token, go get one over https
|
|
|
+ try {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ public Object run() throws IOException {
|
|
|
+ //try https (on http we NEVER get a delegation token)
|
|
|
+ String nnHttpUrl = "https://" + nnServiceName;
|
|
|
+ Credentials c;
|
|
|
+ try {
|
|
|
+ c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl,
|
|
|
+ conf.get(HFTP_RENEWER));
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
|
|
|
+ " using https.");
|
|
|
+ //Maybe the server is in unsecure mode (that's bad but okay)
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
|
|
|
+ //the service field is already set and so setService
|
|
|
+ //is not required
|
|
|
+ delegationToken = t;
|
|
|
+ LOG.debug("Got dt for " + getUri() + ";t.service="
|
|
|
+ +t.getService());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ public Token<? extends TokenIdentifier> getDelegationToken() {
|
|
|
+ return delegationToken;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public URI getUri() {
|
|
@@ -118,6 +176,7 @@ public class HftpFileSystem extends FileSystem {
|
|
|
protected HttpURLConnection openConnection(String path, String query)
|
|
|
throws IOException {
|
|
|
try {
|
|
|
+ query = updateQuery(query);
|
|
|
final URL url = new URI("http", null, nnAddr.getHostName(),
|
|
|
nnAddr.getPort(), path, query, null).toURL();
|
|
|
if (LOG.isTraceEnabled()) {
|
|
@@ -128,6 +187,17 @@ public class HftpFileSystem extends FileSystem {
|
|
|
throw (IOException)new IOException().initCause(e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ protected String updateQuery(String query) throws IOException {
|
|
|
+ String tokenString = null;
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ if (delegationToken != null) {
|
|
|
+ tokenString = delegationToken.encodeToUrlString();
|
|
|
+ return (query + JspHelper.SET_DELEGATION + tokenString);
|
|
|
+ } // else we are talking to an unsecure cluster
|
|
|
+ }
|
|
|
+ return query;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public FSDataInputStream open(Path f, int buffersize) throws IOException {
|