|
@@ -67,6 +67,8 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
|
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
@@ -92,6 +94,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
|
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
|
|
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.util.HostsFileReader;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
@@ -121,6 +124,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
static long RETIRE_JOB_INTERVAL;
|
|
|
static long RETIRE_JOB_CHECK_INTERVAL;
|
|
|
|
|
|
+ private final long DELEGATION_TOKEN_GC_INTERVAL = 3600000; // 1 hour
|
|
|
+ private final DelegationTokenSecretManager secretManager;
|
|
|
+
|
|
|
|
|
|
// The interval after which one fault of a tracker will be discarded,
|
|
|
// if there are no faults during this.
|
|
@@ -136,6 +142,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// tracker could be blacklisted across all jobs
|
|
|
private int MAX_BLACKLISTS_PER_TRACKER = 4;
|
|
|
|
|
|
+ //Delegation token related keys
|
|
|
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
|
|
|
+ "mapreduce.cluster.delegation.key.update-interval";
|
|
|
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
|
|
|
+ 24*60*60*1000; // 1 day
|
|
|
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
|
|
|
+ "mapreduce.cluster.delegation.token.renew-interval";
|
|
|
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
|
|
|
+ 24*60*60*1000; // 1 day
|
|
|
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
|
|
|
+ "mapreduce.cluster.delegation.token.max-lifetime";
|
|
|
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
|
|
|
+ 7*24*60*60*1000; // 7 days
|
|
|
+
|
|
|
// Approximate number of heartbeats that could arrive JobTracker
|
|
|
// in a second
|
|
|
static final String JT_HEARTBEATS_IN_SECOND = "mapred.heartbeats.in.second";
|
|
@@ -1944,6 +1964,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
|
|
|
LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName()
|
|
|
+ " and supergroup as " + supergroup);
|
|
|
+ long secretKeyInterval =
|
|
|
+ conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY,
|
|
|
+ DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
|
|
|
+ long tokenMaxLifetime =
|
|
|
+ conf.getLong(DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
|
|
+ DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
|
|
|
+ long tokenRenewInterval =
|
|
|
+ conf.getLong(DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
|
|
+ DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
|
|
|
+ secretManager =
|
|
|
+ new DelegationTokenSecretManager(secretKeyInterval,
|
|
|
+ tokenMaxLifetime,
|
|
|
+ tokenRenewInterval,
|
|
|
+ DELEGATION_TOKEN_GC_INTERVAL);
|
|
|
+ secretManager.startThreads();
|
|
|
+
|
|
|
|
|
|
//
|
|
|
// Grab some static constants
|
|
@@ -2009,7 +2045,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
|
|
|
- this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
|
|
|
+ this.interTrackerServer =
|
|
|
+ RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount,
|
|
|
+ false, conf, secretManager);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
Properties p = System.getProperties();
|
|
|
for (Iterator it = p.keySet().iterator(); it.hasNext();) {
|
|
@@ -3726,6 +3764,43 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
updateJobInProgressListeners(event);
|
|
|
}
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Discard a current delegation token.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public boolean cancelDelegationToken(Token<DelegationTokenIdentifier> token
|
|
|
+ ) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ String user = UserGroupInformation.getCurrentUser().getUserName();
|
|
|
+ return secretManager.cancelToken(token, user);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Get a new delegation token.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public Token<DelegationTokenIdentifier>
|
|
|
+ getDelegationToken(Text renewer
|
|
|
+ )throws IOException, InterruptedException {
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ Text owner = new Text(ugi.getUserName());
|
|
|
+ Text realUser = null;
|
|
|
+ if (ugi.getRealUser() != null) {
|
|
|
+ realUser = new Text(ugi.getRealUser().getUserName());
|
|
|
+ }
|
|
|
+ DelegationTokenIdentifier ident =
|
|
|
+ new DelegationTokenIdentifier(owner, renewer, realUser);
|
|
|
+ return new Token<DelegationTokenIdentifier>(ident, secretManager);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Renew a delegation token to extend its lifetime.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public boolean renewDelegationToken(Token<DelegationTokenIdentifier> token
|
|
|
+ ) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ String user = UserGroupInformation.getCurrentUser().getUserName();
|
|
|
+ return secretManager.renewToken(token, user);
|
|
|
+ }
|
|
|
|
|
|
public void initJob(JobInProgress job) {
|
|
|
if (null == job) {
|