|
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
|
|
+import java.lang.ref.WeakReference;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
@@ -30,9 +31,11 @@ import java.security.PrivilegedExceptionAction;
|
|
import java.text.ParseException;
|
|
import java.text.ParseException;
|
|
import java.text.SimpleDateFormat;
|
|
import java.text.SimpleDateFormat;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Collection;
|
|
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.TimeZone;
|
|
import java.util.TimeZone;
|
|
|
|
+import java.util.concurrent.DelayQueue;
|
|
|
|
+import java.util.concurrent.Delayed;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@@ -46,6 +49,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
@@ -80,16 +84,18 @@ public class HftpFileSystem extends FileSystem {
|
|
HttpURLConnection.setFollowRedirects(true);
|
|
HttpURLConnection.setFollowRedirects(true);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static final int DEFAULT_PORT = 50470;
|
|
|
|
+ private String nnHttpUrl;
|
|
|
|
+ private URI hdfsURI;
|
|
protected InetSocketAddress nnAddr;
|
|
protected InetSocketAddress nnAddr;
|
|
protected UserGroupInformation ugi;
|
|
protected UserGroupInformation ugi;
|
|
protected final Random ran = new Random();
|
|
protected final Random ran = new Random();
|
|
|
|
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
- private Token<? extends TokenIdentifier> delegationToken;
|
|
|
|
- public static final String HFTP_RENEWER = "fs.hftp.renewer";
|
|
|
|
|
|
+ private Token<DelegationTokenIdentifier> delegationToken;
|
|
public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
|
|
public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
|
|
-
|
|
|
|
|
|
+
|
|
public static final SimpleDateFormat getDateFormat() {
|
|
public static final SimpleDateFormat getDateFormat() {
|
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
|
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
|
|
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
|
|
@@ -98,11 +104,33 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
protected static final ThreadLocal<SimpleDateFormat> df =
|
|
protected static final ThreadLocal<SimpleDateFormat> df =
|
|
new ThreadLocal<SimpleDateFormat>() {
|
|
new ThreadLocal<SimpleDateFormat>() {
|
|
- protected SimpleDateFormat initialValue() {
|
|
|
|
- return getDateFormat();
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
|
|
+ protected SimpleDateFormat initialValue() {
|
|
|
|
+ return getDateFormat();
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ private static RenewerThread renewer = new RenewerThread();
|
|
|
|
+ static {
|
|
|
|
+ renewer.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected int getDefaultPort() {
|
|
|
|
+ return DEFAULT_PORT;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String getCanonicalServiceName() {
|
|
|
|
+ return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private String buildUri(String schema, String host, int port) {
|
|
|
|
+ StringBuilder sb = new StringBuilder(schema);
|
|
|
|
+ return sb.append(host).append(":").append(port).toString();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void initialize(final URI name, final Configuration conf)
|
|
public void initialize(final URI name, final Configuration conf)
|
|
throws IOException {
|
|
throws IOException {
|
|
@@ -110,66 +138,79 @@ public class HftpFileSystem extends FileSystem {
|
|
setConf(conf);
|
|
setConf(conf);
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
nnAddr = NetUtils.createSocketAddr(name.toString());
|
|
nnAddr = NetUtils.createSocketAddr(name.toString());
|
|
-
|
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
|
- // configuration has the actual service name for this url. Build the key
|
|
|
|
- // and get it.
|
|
|
|
- final String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY +
|
|
|
|
- SecurityUtil.buildDTServiceName(name, NameNode.DEFAULT_PORT);
|
|
|
|
|
|
+
|
|
|
|
+ nnHttpUrl = buildUri("https://", NetUtils.normalizeHostName(name.getHost()),
|
|
|
|
+ conf.getInt("dfs.https.port", DEFAULT_PORT));
|
|
|
|
+
|
|
|
|
+ // if one uses RPC port different from the Default one,
|
|
|
|
+ // one should specify what is the setvice name for this delegation token
|
|
|
|
+ // otherwise it is hostname:RPC_PORT
|
|
|
|
+ String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
|
|
|
|
+ SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
|
|
|
|
+ LOG.debug("Trying to find DT for " + name + " using key=" + key +
|
|
|
|
+ "; conf=" + conf.get(key, ""));
|
|
|
|
+ String nnServiceName = conf.get(key);
|
|
|
|
+ int nnPort = NameNode.DEFAULT_PORT;
|
|
|
|
+ if (nnServiceName != null) { // get the real port
|
|
|
|
+ nnPort = NetUtils.createSocketAddr(nnServiceName,
|
|
|
|
+ NameNode.DEFAULT_PORT).getPort();
|
|
|
|
+ }
|
|
|
|
|
|
- LOG.debug("Trying to find DT for " + name + " using key=" + key +
|
|
|
|
- "; conf=" + conf.get(key, ""));
|
|
|
|
- Text nnServiceNameText = new Text(conf.get(key, ""));
|
|
|
|
|
|
+ try {
|
|
|
|
+ hdfsURI = new URI(buildUri("hdfs://", nnAddr.getHostName(), nnPort));
|
|
|
|
+ } catch (URISyntaxException ue) {
|
|
|
|
+ throw new IOException("bad uri for hdfs", ue);
|
|
|
|
+ }
|
|
|
|
|
|
- Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
|
|
|
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
//try finding a token for this namenode (esp applicable for tasks
|
|
//try finding a token for this namenode (esp applicable for tasks
|
|
//using hftp). If there exists one, just set the delegationField
|
|
//using hftp). If there exists one, just set the delegationField
|
|
- for (Token<? extends TokenIdentifier> t : tokens) {
|
|
|
|
- if ((t.getService()).equals(nnServiceNameText)) {
|
|
|
|
|
|
+ String canonicalName = getCanonicalServiceName();
|
|
|
|
+ for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
|
|
|
|
+ if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind()) &&
|
|
|
|
+ t.getService().toString().equals(canonicalName)) {
|
|
LOG.debug("Found existing DT for " + name);
|
|
LOG.debug("Found existing DT for " + name);
|
|
- delegationToken = t;
|
|
|
|
- return;
|
|
|
|
|
|
+ delegationToken = (Token<DelegationTokenIdentifier>) t;
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//since we don't already have a token, go get one over https
|
|
//since we don't already have a token, go get one over https
|
|
- try {
|
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
- public Object run() throws IOException {
|
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
|
- //try https (on http we NEVER get a delegation token)
|
|
|
|
- String nnHttpUrl = "https://" +
|
|
|
|
- (sb.append(NetUtils.normalizeHostName(name.getHost()))
|
|
|
|
- .append(":").append(conf.getInt("dfs.https.port", 50470))).
|
|
|
|
- toString();
|
|
|
|
- Credentials c;
|
|
|
|
- try {
|
|
|
|
- c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl,
|
|
|
|
- conf.get(HFTP_RENEWER));
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
|
|
|
|
- " using https.");
|
|
|
|
- //Maybe the server is in unsecure mode (that's bad but okay)
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
|
|
|
|
- //the service field is already set and so setService
|
|
|
|
- //is not required
|
|
|
|
- delegationToken = t;
|
|
|
|
- LOG.debug("Got dt for " + getUri() + ";t.service="
|
|
|
|
- +t.getService());
|
|
|
|
- }
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
|
|
+ if (delegationToken == null) {
|
|
|
|
+ delegationToken =
|
|
|
|
+ (Token<DelegationTokenIdentifier>) getDelegationToken(null);
|
|
|
|
+ renewer.addTokenToRenew(this);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- public Token<? extends TokenIdentifier> getDelegationToken() {
|
|
|
|
- return delegationToken;
|
|
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Token<?> getDelegationToken(final String renewer) throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
|
|
|
+ public Token<?> run() throws IOException {
|
|
|
|
+ Credentials c;
|
|
|
|
+ try {
|
|
|
|
+ c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
|
|
|
|
+ " using https.");
|
|
|
|
+ LOG.debug("error was ", e);
|
|
|
|
+ //Maybe the server is in unsecure mode (that's bad but okay)
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
|
|
|
|
+ LOG.debug("Got dt for " + getUri() + ";t.service="
|
|
|
|
+ +t.getService());
|
|
|
|
+ t.setService(new Text(getCanonicalServiceName()));
|
|
|
|
+ return t;
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -238,10 +279,12 @@ public class HftpFileSystem extends FileSystem {
|
|
protected String updateQuery(String query) throws IOException {
|
|
protected String updateQuery(String query) throws IOException {
|
|
String tokenString = null;
|
|
String tokenString = null;
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
- if (delegationToken != null) {
|
|
|
|
- tokenString = delegationToken.encodeToUrlString();
|
|
|
|
- return (query + JspHelper.SET_DELEGATION + tokenString);
|
|
|
|
- } // else we are talking to an unsecure cluster
|
|
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ if (delegationToken != null) {
|
|
|
|
+ tokenString = delegationToken.encodeToUrlString();
|
|
|
|
+ return (query + JspHelper.SET_DELEGATION + tokenString);
|
|
|
|
+ } // else we are talking to an insecure cluster
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return query;
|
|
return query;
|
|
}
|
|
}
|
|
@@ -520,4 +563,157 @@ public class HftpFileSystem extends FileSystem {
|
|
final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
|
|
final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
|
|
return cs != null? cs: super.getContentSummary(f);
|
|
return cs != null? cs: super.getContentSummary(f);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * An action that will renew and replace the hftp file system's delegation
|
|
|
|
+ * tokens automatically.
|
|
|
|
+ */
|
|
|
|
+ private static class RenewAction implements Delayed {
|
|
|
|
+ // when should the renew happen
|
|
|
|
+ private long timestamp;
|
|
|
|
+ // a weak reference to the file system so that it can be garbage collected
|
|
|
|
+ private final WeakReference<HftpFileSystem> weakFs;
|
|
|
|
+
|
|
|
|
+ RenewAction(long timestamp, HftpFileSystem fs) {
|
|
|
|
+ this.timestamp = timestamp;
|
|
|
|
+ this.weakFs = new WeakReference<HftpFileSystem>(fs);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the delay until this event should happen.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public long getDelay(TimeUnit unit) {
|
|
|
|
+ long millisLeft = timestamp - System.currentTimeMillis();
|
|
|
|
+ return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Compare two events in the same queue.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public int compareTo(Delayed o) {
|
|
|
|
+ if (o.getClass() != RenewAction.class) {
|
|
|
|
+ throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
|
|
|
|
+ }
|
|
|
|
+ RenewAction other = (RenewAction) o;
|
|
|
|
+ return timestamp < other.timestamp ? -1 :
|
|
|
|
+ (timestamp == other.timestamp ? 0 : 1);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public int hashCode() {
|
|
|
|
+ assert false : "hashCode not designed";
|
|
|
|
+ return 33;
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * equals
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public boolean equals(Object o) {
|
|
|
|
+ if(!( o instanceof Delayed))
|
|
|
|
+ return false;
|
|
|
|
+
|
|
|
|
+ return compareTo((Delayed) o) == 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Set a new time for the renewal. Can only be called when the action
|
|
|
|
+ * is not in the queue.
|
|
|
|
+ * @param newTime the new time
|
|
|
|
+ */
|
|
|
|
+ public void setNewTime(long newTime) {
|
|
|
|
+ timestamp = newTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Renew or replace the delegation token for this file system.
|
|
|
|
+ * @return
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ public boolean renew() throws IOException, InterruptedException {
|
|
|
|
+ final HftpFileSystem fs = weakFs.get();
|
|
|
|
+ if (fs != null) {
|
|
|
|
+ synchronized (fs) {
|
|
|
|
+ fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Void run() throws Exception {
|
|
|
|
+ try {
|
|
|
|
+ DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl,
|
|
|
|
+ fs.delegationToken);
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ try {
|
|
|
|
+ fs.delegationToken =
|
|
|
|
+ (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
|
|
|
|
+ } catch (IOException ie2) {
|
|
|
|
+ throw new IOException("Can't renew or get new delegation token ",
|
|
|
|
+ ie);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return fs != null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String toString() {
|
|
|
|
+ StringBuilder result = new StringBuilder();
|
|
|
|
+ HftpFileSystem fs = weakFs.get();
|
|
|
|
+ if (fs == null) {
|
|
|
|
+ return "evaporated token renew";
|
|
|
|
+ }
|
|
|
|
+ synchronized (fs) {
|
|
|
|
+ result.append(fs.delegationToken);
|
|
|
|
+ }
|
|
|
|
+ result.append(" renew in ");
|
|
|
|
+ result.append(getDelay(TimeUnit.SECONDS));
|
|
|
|
+ result.append(" secs");
|
|
|
|
+ return result.toString();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A daemon thread that waits for the next file system to renew.
|
|
|
|
+ */
|
|
|
|
+ private static class RenewerThread extends Thread {
|
|
|
|
+ private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>();
|
|
|
|
+ // wait for 95% of a day between renewals
|
|
|
|
+ private static final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000);
|
|
|
|
+
|
|
|
|
+ public RenewerThread() {
|
|
|
|
+ super("HFTP Delegation Token Renewer");
|
|
|
|
+ setDaemon(true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void addTokenToRenew(HftpFileSystem fs) {
|
|
|
|
+ queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void run() {
|
|
|
|
+ RenewAction action = null;
|
|
|
|
+ while (true) {
|
|
|
|
+ try {
|
|
|
|
+ action = queue.take();
|
|
|
|
+ if (action.renew()) {
|
|
|
|
+ action.setNewTime(RENEW_CYCLE + System.currentTimeMillis());
|
|
|
|
+ queue.add(action);
|
|
|
|
+ }
|
|
|
|
+ action = null;
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ return;
|
|
|
|
+ } catch (Exception ie) {
|
|
|
|
+ if (action != null) {
|
|
|
|
+ LOG.warn("Failure to renew token " + action, ie);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn("Failure in renew queue", ie);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|