|
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
import org.apache.hadoop.ipc.Server;
|
|
import org.apache.hadoop.ipc.Server;
|
|
import org.apache.hadoop.net.Node;
|
|
import org.apache.hadoop.net.Node;
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
@@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
@@ -68,6 +68,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|
private final NodesListManager nodesListManager;
|
|
private final NodesListManager nodesListManager;
|
|
private final NMLivelinessMonitor nmLivelinessMonitor;
|
|
private final NMLivelinessMonitor nmLivelinessMonitor;
|
|
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
|
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
|
|
|
+ private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
|
|
|
|
|
private long nextHeartBeatInterval;
|
|
private long nextHeartBeatInterval;
|
|
private Server server;
|
|
private Server server;
|
|
@@ -90,12 +91,14 @@ public class ResourceTrackerService extends AbstractService implements
|
|
public ResourceTrackerService(RMContext rmContext,
|
|
public ResourceTrackerService(RMContext rmContext,
|
|
NodesListManager nodesListManager,
|
|
NodesListManager nodesListManager,
|
|
NMLivelinessMonitor nmLivelinessMonitor,
|
|
NMLivelinessMonitor nmLivelinessMonitor,
|
|
- RMContainerTokenSecretManager containerTokenSecretManager) {
|
|
|
|
|
|
+ RMContainerTokenSecretManager containerTokenSecretManager,
|
|
|
|
+ NMTokenSecretManagerInRM nmTokenSecretManager) {
|
|
super(ResourceTrackerService.class.getName());
|
|
super(ResourceTrackerService.class.getName());
|
|
this.rmContext = rmContext;
|
|
this.rmContext = rmContext;
|
|
this.nodesListManager = nodesListManager;
|
|
this.nodesListManager = nodesListManager;
|
|
this.nmLivelinessMonitor = nmLivelinessMonitor;
|
|
this.nmLivelinessMonitor = nmLivelinessMonitor;
|
|
this.containerTokenSecretManager = containerTokenSecretManager;
|
|
this.containerTokenSecretManager = containerTokenSecretManager;
|
|
|
|
+ this.nmTokenSecretManager = nmTokenSecretManager;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -197,9 +200,10 @@ public class ResourceTrackerService extends AbstractService implements
|
|
return response;
|
|
return response;
|
|
}
|
|
}
|
|
|
|
|
|
- MasterKey nextMasterKeyForNode =
|
|
|
|
- this.containerTokenSecretManager.getCurrentKey();
|
|
|
|
- response.setMasterKey(nextMasterKeyForNode);
|
|
|
|
|
|
+ response.setContainerTokenMasterKey(containerTokenSecretManager
|
|
|
|
+ .getCurrentKey());
|
|
|
|
+ response.setNMTokenMasterKey(nmTokenSecretManager
|
|
|
|
+ .getCurrentKey());
|
|
|
|
|
|
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
|
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
|
resolve(host), capability);
|
|
resolve(host), capability);
|
|
@@ -292,27 +296,11 @@ public class ResourceTrackerService extends AbstractService implements
|
|
// Heartbeat response
|
|
// Heartbeat response
|
|
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
|
|
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
|
|
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
|
|
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
|
|
- getResponseId() + 1, NodeAction.NORMAL, null, null, null,
|
|
|
|
|
|
+ getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
|
|
nextHeartBeatInterval);
|
|
nextHeartBeatInterval);
|
|
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
|
|
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
|
|
|
|
|
|
- // Check if node's masterKey needs to be updated and if the currentKey has
|
|
|
|
- // roller over, send it across
|
|
|
|
- boolean shouldSendMasterKey = false;
|
|
|
|
-
|
|
|
|
- MasterKey nextMasterKeyForNode =
|
|
|
|
- this.containerTokenSecretManager.getNextKey();
|
|
|
|
- if (nextMasterKeyForNode != null) {
|
|
|
|
- // nextMasterKeyForNode can be null if there is no outstanding key that
|
|
|
|
- // is in the activation period.
|
|
|
|
- MasterKey nodeKnownMasterKey = request.getLastKnownMasterKey();
|
|
|
|
- if (nodeKnownMasterKey.getKeyId() != nextMasterKeyForNode.getKeyId()) {
|
|
|
|
- shouldSendMasterKey = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (shouldSendMasterKey) {
|
|
|
|
- nodeHeartBeatResponse.setMasterKey(nextMasterKeyForNode);
|
|
|
|
- }
|
|
|
|
|
|
+ populateKeys(request, nodeHeartBeatResponse);
|
|
|
|
|
|
// 4. Send status to RMNode, saving the latest response.
|
|
// 4. Send status to RMNode, saving the latest response.
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
@@ -323,6 +311,32 @@ public class ResourceTrackerService extends AbstractService implements
|
|
return nodeHeartBeatResponse;
|
|
return nodeHeartBeatResponse;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void populateKeys(NodeHeartbeatRequest request,
|
|
|
|
+ NodeHeartbeatResponse nodeHeartBeatResponse) {
|
|
|
|
+
|
|
|
|
+ // Check if node's masterKey needs to be updated and if the currentKey has
|
|
|
|
+ // roller over, send it across
|
|
|
|
+
|
|
|
|
+ // ContainerTokenMasterKey
|
|
|
|
+
|
|
|
|
+ MasterKey nextMasterKeyForNode =
|
|
|
|
+ this.containerTokenSecretManager.getNextKey();
|
|
|
|
+ if (nextMasterKeyForNode != null
|
|
|
|
+ && (request.getLastKnownContainerTokenMasterKey().getKeyId()
|
|
|
|
+ != nextMasterKeyForNode.getKeyId())) {
|
|
|
|
+ nodeHeartBeatResponse.setContainerTokenMasterKey(nextMasterKeyForNode);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // NMTokenMasterKey
|
|
|
|
+
|
|
|
|
+ nextMasterKeyForNode = this.nmTokenSecretManager.getNextKey();
|
|
|
|
+ if (nextMasterKeyForNode != null
|
|
|
|
+ && (request.getLastKnownNMTokenMasterKey().getKeyId()
|
|
|
|
+ != nextMasterKeyForNode.getKeyId())) {
|
|
|
|
+ nodeHeartBeatResponse.setNMTokenMasterKey(nextMasterKeyForNode);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* resolving the network topology.
|
|
* resolving the network topology.
|
|
* @param hostName the hostname of this node.
|
|
* @param hostName the hostname of this node.
|