|
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
-import java.lang.ref.WeakReference;
|
|
|
import java.net.HttpURLConnection;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
@@ -31,11 +30,9 @@ import java.security.PrivilegedExceptionAction;
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Random;
|
|
|
import java.util.TimeZone;
|
|
|
-import java.util.concurrent.DelayQueue;
|
|
|
-import java.util.concurrent.Delayed;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@@ -49,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
|
@@ -84,18 +80,16 @@ public class HftpFileSystem extends FileSystem {
|
|
|
HttpURLConnection.setFollowRedirects(true);
|
|
|
}
|
|
|
|
|
|
- private static final int DEFAULT_PORT = 50470;
|
|
|
- private String nnHttpUrl;
|
|
|
- private URI hdfsURI;
|
|
|
protected InetSocketAddress nnAddr;
|
|
|
protected UserGroupInformation ugi;
|
|
|
protected final Random ran = new Random();
|
|
|
|
|
|
public static final String HFTP_TIMEZONE = "UTC";
|
|
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
|
|
- private Token<DelegationTokenIdentifier> delegationToken;
|
|
|
+ private Token<? extends TokenIdentifier> delegationToken;
|
|
|
+ public static final String HFTP_RENEWER = "fs.hftp.renewer";
|
|
|
public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
|
|
|
-
|
|
|
+
|
|
|
public static final SimpleDateFormat getDateFormat() {
|
|
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
|
|
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
|
|
@@ -104,33 +98,11 @@ public class HftpFileSystem extends FileSystem {
|
|
|
|
|
|
protected static final ThreadLocal<SimpleDateFormat> df =
|
|
|
new ThreadLocal<SimpleDateFormat>() {
|
|
|
- protected SimpleDateFormat initialValue() {
|
|
|
- return getDateFormat();
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- private static RenewerThread renewer = new RenewerThread();
|
|
|
- static {
|
|
|
- renewer.start();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected int getDefaultPort() {
|
|
|
- return DEFAULT_PORT;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String getCanonicalServiceName() {
|
|
|
- return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
|
|
|
- }
|
|
|
-
|
|
|
- private String buildUri(String schema, String host, int port) {
|
|
|
- StringBuilder sb = new StringBuilder(schema);
|
|
|
- return sb.append(host).append(":").append(port).toString();
|
|
|
- }
|
|
|
-
|
|
|
+ protected SimpleDateFormat initialValue() {
|
|
|
+ return getDateFormat();
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public void initialize(final URI name, final Configuration conf)
|
|
|
throws IOException {
|
|
@@ -138,79 +110,66 @@ public class HftpFileSystem extends FileSystem {
|
|
|
setConf(conf);
|
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
|
nnAddr = NetUtils.createSocketAddr(name.toString());
|
|
|
-
|
|
|
- nnHttpUrl = buildUri("https://", NetUtils.normalizeHostName(name.getHost()),
|
|
|
- conf.getInt("dfs.https.port", DEFAULT_PORT));
|
|
|
-
|
|
|
- // if one uses RPC port different from the Default one,
|
|
|
- // one should specify what is the setvice name for this delegation token
|
|
|
- // otherwise it is hostname:RPC_PORT
|
|
|
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
|
|
|
- SecurityUtil.buildDTServiceName(name, DEFAULT_PORT);
|
|
|
- LOG.debug("Trying to find DT for " + name + " using key=" + key +
|
|
|
- "; conf=" + conf.get(key, ""));
|
|
|
- String nnServiceName = conf.get(key);
|
|
|
- int nnPort = NameNode.DEFAULT_PORT;
|
|
|
- if (nnServiceName != null) { // get the real port
|
|
|
- nnPort = NetUtils.createSocketAddr(nnServiceName,
|
|
|
- NameNode.DEFAULT_PORT).getPort();
|
|
|
- }
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ // configuration has the actual service name for this url. Build the key
|
|
|
+ // and get it.
|
|
|
+ final String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY +
|
|
|
+ SecurityUtil.buildDTServiceName(name, NameNode.DEFAULT_PORT);
|
|
|
|
|
|
- try {
|
|
|
- hdfsURI = new URI(buildUri("hdfs://", nnAddr.getHostName(), nnPort));
|
|
|
- } catch (URISyntaxException ue) {
|
|
|
- throw new IOException("bad uri for hdfs", ue);
|
|
|
- }
|
|
|
+ LOG.debug("Trying to find DT for " + name + " using key=" + key +
|
|
|
+ "; conf=" + conf.get(key, ""));
|
|
|
+ Text nnServiceNameText = new Text(conf.get(key, ""));
|
|
|
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
|
|
|
//try finding a token for this namenode (esp applicable for tasks
|
|
|
//using hftp). If there exists one, just set the delegationField
|
|
|
- String canonicalName = getCanonicalServiceName();
|
|
|
- for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
|
|
|
- if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind()) &&
|
|
|
- t.getService().toString().equals(canonicalName)) {
|
|
|
+ for (Token<? extends TokenIdentifier> t : tokens) {
|
|
|
+ if ((t.getService()).equals(nnServiceNameText)) {
|
|
|
LOG.debug("Found existing DT for " + name);
|
|
|
- delegationToken = (Token<DelegationTokenIdentifier>) t;
|
|
|
- break;
|
|
|
+ delegationToken = t;
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
//since we don't already have a token, go get one over https
|
|
|
- if (delegationToken == null) {
|
|
|
- delegationToken =
|
|
|
- (Token<DelegationTokenIdentifier>) getDelegationToken(null);
|
|
|
- renewer.addTokenToRenew(this);
|
|
|
+ try {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ public Object run() throws IOException {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ //try https (on http we NEVER get a delegation token)
|
|
|
+ String nnHttpUrl = "https://" +
|
|
|
+ (sb.append(NetUtils.normalizeHostName(name.getHost()))
|
|
|
+ .append(":").append(conf.getInt("dfs.https.port", 50470))).
|
|
|
+ toString();
|
|
|
+ Credentials c;
|
|
|
+ try {
|
|
|
+ c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl,
|
|
|
+ conf.get(HFTP_RENEWER));
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
|
|
|
+ " using https.");
|
|
|
+ //Maybe the server is in unsecure mode (that's bad but okay)
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
|
|
|
+ //the service field is already set and so setService
|
|
|
+ //is not required
|
|
|
+ delegationToken = t;
|
|
|
+ LOG.debug("Got dt for " + getUri() + ";t.service="
|
|
|
+ +t.getService());
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- @Override
|
|
|
- public Token<?> getDelegationToken(final String renewer) throws IOException {
|
|
|
- try {
|
|
|
- return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
|
|
- public Token<?> run() throws IOException {
|
|
|
- Credentials c;
|
|
|
- try {
|
|
|
- c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
|
|
|
- " using https.");
|
|
|
- LOG.debug("error was ", e);
|
|
|
- //Maybe the server is in unsecure mode (that's bad but okay)
|
|
|
- return null;
|
|
|
- }
|
|
|
- for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
|
|
|
- LOG.debug("Got dt for " + getUri() + ";t.service="
|
|
|
- +t.getService());
|
|
|
- t.setService(new Text(getCanonicalServiceName()));
|
|
|
- return t;
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (InterruptedException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
+
|
|
|
+ public Token<? extends TokenIdentifier> getDelegationToken() {
|
|
|
+ return delegationToken;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -279,12 +238,10 @@ public class HftpFileSystem extends FileSystem {
|
|
|
protected String updateQuery(String query) throws IOException {
|
|
|
String tokenString = null;
|
|
|
if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- synchronized (this) {
|
|
|
- if (delegationToken != null) {
|
|
|
- tokenString = delegationToken.encodeToUrlString();
|
|
|
- return (query + JspHelper.SET_DELEGATION + tokenString);
|
|
|
- } // else we are talking to an insecure cluster
|
|
|
- }
|
|
|
+ if (delegationToken != null) {
|
|
|
+ tokenString = delegationToken.encodeToUrlString();
|
|
|
+ return (query + JspHelper.SET_DELEGATION + tokenString);
|
|
|
+ } // else we are talking to an unsecure cluster
|
|
|
}
|
|
|
return query;
|
|
|
}
|
|
@@ -563,157 +520,4 @@ public class HftpFileSystem extends FileSystem {
|
|
|
final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
|
|
|
return cs != null? cs: super.getContentSummary(f);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * An action that will renew and replace the hftp file system's delegation
|
|
|
- * tokens automatically.
|
|
|
- */
|
|
|
- private static class RenewAction implements Delayed {
|
|
|
- // when should the renew happen
|
|
|
- private long timestamp;
|
|
|
- // a weak reference to the file system so that it can be garbage collected
|
|
|
- private final WeakReference<HftpFileSystem> weakFs;
|
|
|
-
|
|
|
- RenewAction(long timestamp, HftpFileSystem fs) {
|
|
|
- this.timestamp = timestamp;
|
|
|
- this.weakFs = new WeakReference<HftpFileSystem>(fs);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get the delay until this event should happen.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public long getDelay(TimeUnit unit) {
|
|
|
- long millisLeft = timestamp - System.currentTimeMillis();
|
|
|
- return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Compare two events in the same queue.
|
|
|
- */
|
|
|
- @Override
|
|
|
- public int compareTo(Delayed o) {
|
|
|
- if (o.getClass() != RenewAction.class) {
|
|
|
- throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
|
|
|
- }
|
|
|
- RenewAction other = (RenewAction) o;
|
|
|
- return timestamp < other.timestamp ? -1 :
|
|
|
- (timestamp == other.timestamp ? 0 : 1);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public int hashCode() {
|
|
|
- assert false : "hashCode not designed";
|
|
|
- return 33;
|
|
|
- }
|
|
|
- /**
|
|
|
- * equals
|
|
|
- */
|
|
|
- @Override
|
|
|
- public boolean equals(Object o) {
|
|
|
- if(!( o instanceof Delayed))
|
|
|
- return false;
|
|
|
-
|
|
|
- return compareTo((Delayed) o) == 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Set a new time for the renewal. Can only be called when the action
|
|
|
- * is not in the queue.
|
|
|
- * @param newTime the new time
|
|
|
- */
|
|
|
- public void setNewTime(long newTime) {
|
|
|
- timestamp = newTime;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Renew or replace the delegation token for this file system.
|
|
|
- * @return
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public boolean renew() throws IOException, InterruptedException {
|
|
|
- final HftpFileSystem fs = weakFs.get();
|
|
|
- if (fs != null) {
|
|
|
- synchronized (fs) {
|
|
|
- fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public Void run() throws Exception {
|
|
|
- try {
|
|
|
- DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl,
|
|
|
- fs.delegationToken);
|
|
|
- } catch (IOException ie) {
|
|
|
- try {
|
|
|
- fs.delegationToken =
|
|
|
- (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
|
|
|
- } catch (IOException ie2) {
|
|
|
- throw new IOException("Can't renew or get new delegation token ",
|
|
|
- ie);
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
- return fs != null;
|
|
|
- }
|
|
|
-
|
|
|
- public String toString() {
|
|
|
- StringBuilder result = new StringBuilder();
|
|
|
- HftpFileSystem fs = weakFs.get();
|
|
|
- if (fs == null) {
|
|
|
- return "evaporated token renew";
|
|
|
- }
|
|
|
- synchronized (fs) {
|
|
|
- result.append(fs.delegationToken);
|
|
|
- }
|
|
|
- result.append(" renew in ");
|
|
|
- result.append(getDelay(TimeUnit.SECONDS));
|
|
|
- result.append(" secs");
|
|
|
- return result.toString();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * A daemon thread that waits for the next file system to renew.
|
|
|
- */
|
|
|
- private static class RenewerThread extends Thread {
|
|
|
- private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>();
|
|
|
- // wait for 95% of a day between renewals
|
|
|
- private static final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000);
|
|
|
-
|
|
|
- public RenewerThread() {
|
|
|
- super("HFTP Delegation Token Renewer");
|
|
|
- setDaemon(true);
|
|
|
- }
|
|
|
-
|
|
|
- public void addTokenToRenew(HftpFileSystem fs) {
|
|
|
- queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
|
|
|
- }
|
|
|
-
|
|
|
- public void run() {
|
|
|
- RenewAction action = null;
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- action = queue.take();
|
|
|
- if (action.renew()) {
|
|
|
- action.setNewTime(RENEW_CYCLE + System.currentTimeMillis());
|
|
|
- queue.add(action);
|
|
|
- }
|
|
|
- action = null;
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- return;
|
|
|
- } catch (Exception ie) {
|
|
|
- if (action != null) {
|
|
|
- LOG.warn("Failure to renew token " + action, ie);
|
|
|
- } else {
|
|
|
- LOG.warn("Failure in renew queue", ie);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|