|
@@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
|
|
|
|
public class ResourceTrackerService extends AbstractService implements
|
|
public class ResourceTrackerService extends AbstractService implements
|
|
- ResourceTracker, RMNodeRemovalListener {
|
|
|
|
|
|
+ ResourceTracker {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(ResourceTrackerService.class);
|
|
private static final Log LOG = LogFactory.getLog(ResourceTrackerService.class);
|
|
|
|
|
|
@@ -76,13 +76,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|
private final NMLivelinessMonitor nmLivelinessMonitor;
|
|
private final NMLivelinessMonitor nmLivelinessMonitor;
|
|
private final ContainerTokenSecretManager containerTokenSecretManager;
|
|
private final ContainerTokenSecretManager containerTokenSecretManager;
|
|
|
|
|
|
- /* we dont garbage collect on nodes. A node can come back up again and re register,
|
|
|
|
- * so no use garbage collecting. Though admin can break the RM by bouncing
|
|
|
|
- * nodemanagers on different ports again and again.
|
|
|
|
- */
|
|
|
|
- private Map<String, NodeId> nodes = new ConcurrentHashMap<String, NodeId>();
|
|
|
|
- private final AtomicInteger nodeCounter = new AtomicInteger(0);
|
|
|
|
-
|
|
|
|
private Server server;
|
|
private Server server;
|
|
private InetSocketAddress resourceTrackerAddress;
|
|
private InetSocketAddress resourceTrackerAddress;
|
|
|
|
|
|
@@ -95,9 +88,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|
reboot.setHeartbeatResponse(rebootResp);
|
|
reboot.setHeartbeatResponse(rebootResp);
|
|
}
|
|
}
|
|
|
|
|
|
- private final ConcurrentMap<NodeId, HeartbeatResponse> lastHeartBeats
|
|
|
|
- = new ConcurrentHashMap<NodeId, HeartbeatResponse>();
|
|
|
|
-
|
|
|
|
public ResourceTrackerService(RMContext rmContext,
|
|
public ResourceTrackerService(RMContext rmContext,
|
|
NodesListManager nodesListManager,
|
|
NodesListManager nodesListManager,
|
|
NMLivelinessMonitor nmLivelinessMonitor,
|
|
NMLivelinessMonitor nmLivelinessMonitor,
|
|
@@ -151,8 +141,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|
public RegisterNodeManagerResponse registerNodeManager(
|
|
public RegisterNodeManagerResponse registerNodeManager(
|
|
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
|
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
|
|
|
|
|
- String host = request.getHost();
|
|
|
|
- int cmPort = request.getContainerManagerPort();
|
|
|
|
|
|
+ NodeId nodeId = request.getNodeId();
|
|
|
|
+ String host = nodeId.getHost();
|
|
|
|
+ int cmPort = nodeId.getPort();
|
|
int httpPort = request.getHttpPort();
|
|
int httpPort = request.getHttpPort();
|
|
Resource capability = request.getResource();
|
|
Resource capability = request.getResource();
|
|
|
|
|
|
@@ -163,23 +154,24 @@ public class ResourceTrackerService extends AbstractService implements
|
|
throw new IOException("Disallowed NodeManager from " + host);
|
|
throw new IOException("Disallowed NodeManager from " + host);
|
|
}
|
|
}
|
|
|
|
|
|
- String node = host + ":" + cmPort;
|
|
|
|
- NodeId nodeId = mayBeCreateAndGetNodeId(node);
|
|
|
|
-
|
|
|
|
- createNewNode(nodeId, host, cmPort, httpPort, resolve(host), capability);
|
|
|
|
|
|
+ RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
|
|
|
|
+ httpPort, resolve(host), capability);
|
|
|
|
+
|
|
|
|
+ if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
|
|
|
|
+ throw new IOException("Duplicate registration from the node!");
|
|
|
|
+ }
|
|
|
|
|
|
this.nmLivelinessMonitor.register(nodeId);
|
|
this.nmLivelinessMonitor.register(nodeId);
|
|
|
|
|
|
LOG.info("NodeManager from node " + host +
|
|
LOG.info("NodeManager from node " + host +
|
|
"(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
|
|
"(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
|
|
+ "registered with capability: " + capability.getMemory()
|
|
+ "registered with capability: " + capability.getMemory()
|
|
- + ", assigned nodeId " + nodeId.getId());
|
|
|
|
|
|
+ + ", assigned nodeId " + nodeId);
|
|
|
|
|
|
RegistrationResponse regResponse = recordFactory.newRecordInstance(
|
|
RegistrationResponse regResponse = recordFactory.newRecordInstance(
|
|
RegistrationResponse.class);
|
|
RegistrationResponse.class);
|
|
- regResponse.setNodeId(nodeId);
|
|
|
|
- SecretKey secretKey =
|
|
|
|
- this.containerTokenSecretManager.createAndGetSecretKey(node);
|
|
|
|
|
|
+ SecretKey secretKey = this.containerTokenSecretManager
|
|
|
|
+ .createAndGetSecretKey(nodeId.toString());
|
|
regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
|
|
regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
|
|
|
|
|
|
RegisterNodeManagerResponse response = recordFactory
|
|
RegisterNodeManagerResponse response = recordFactory
|
|
@@ -187,7 +179,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|
response.setRegistrationResponse(regResponse);
|
|
response.setRegistrationResponse(regResponse);
|
|
return response;
|
|
return response;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
- LOG.info("Exception in node registration from " + request.getHost(), ioe);
|
|
|
|
|
|
+ LOG.info("Exception in node registration from " + nodeId.getHost(), ioe);
|
|
throw RPCUtil.getRemoteException(ioe);
|
|
throw RPCUtil.getRemoteException(ioe);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -231,17 +223,18 @@ public class ResourceTrackerService extends AbstractService implements
|
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
|
|
|
|
|
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
|
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
|
- if (remoteNodeStatus.getResponseId() + 1 == this.lastHeartBeats.get(nodeId)
|
|
|
|
|
|
+ HeartbeatResponse lastHeartbeatResponse = rmNode
|
|
|
|
+ .getLastHeartBeatResponse();
|
|
|
|
+ if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
|
|
.getResponseId()) {
|
|
.getResponseId()) {
|
|
LOG.info("Received duplicate heartbeat from node " +
|
|
LOG.info("Received duplicate heartbeat from node " +
|
|
rmNode.getNodeAddress());
|
|
rmNode.getNodeAddress());
|
|
- nodeHeartBeatResponse.setHeartbeatResponse(this.lastHeartBeats
|
|
|
|
- .get(nodeId));
|
|
|
|
|
|
+ nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
|
|
return nodeHeartBeatResponse;
|
|
return nodeHeartBeatResponse;
|
|
- } else if (remoteNodeStatus.getResponseId() + 1 < this.lastHeartBeats
|
|
|
|
- .get(nodeId).getResponseId()) {
|
|
|
|
|
|
+ } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
|
|
|
|
+ .getResponseId()) {
|
|
LOG.info("Too far behind rm response id:" +
|
|
LOG.info("Too far behind rm response id:" +
|
|
- this.lastHeartBeats.get(nodeId).getResponseId() + " nm response id:"
|
|
|
|
|
|
+ lastHeartbeatResponse.getResponseId() + " nm response id:"
|
|
+ remoteNodeStatus.getResponseId());
|
|
+ remoteNodeStatus.getResponseId());
|
|
// TODO: Just sending reboot is not enough. Think more.
|
|
// TODO: Just sending reboot is not enough. Think more.
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
@@ -249,22 +242,20 @@ public class ResourceTrackerService extends AbstractService implements
|
|
return reboot;
|
|
return reboot;
|
|
}
|
|
}
|
|
|
|
|
|
- // 4. Send status to RMNode
|
|
|
|
|
|
+ // Heartbeat response
|
|
|
|
+ HeartbeatResponse latestResponse = recordFactory
|
|
|
|
+ .newRecordInstance(HeartbeatResponse.class);
|
|
|
|
+ latestResponse
|
|
|
|
+ .setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
|
|
|
+ latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
|
|
|
|
+ latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
|
|
|
|
+
|
|
|
|
+ // 4. Send status to RMNode, saving the latest response.
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
|
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
|
- remoteNodeStatus.getAllContainers()));
|
|
|
|
|
|
+ remoteNodeStatus.getAllContainers(), latestResponse));
|
|
|
|
|
|
- // Heartbeat response
|
|
|
|
- HeartbeatResponse response = recordFactory
|
|
|
|
- .newRecordInstance(HeartbeatResponse.class);
|
|
|
|
- response
|
|
|
|
- .setResponseId(this.lastHeartBeats.get(nodeId).getResponseId() + 1);
|
|
|
|
- response.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
|
|
|
|
- response.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
|
|
|
|
-
|
|
|
|
- // Save the response
|
|
|
|
- this.lastHeartBeats.put(nodeId, response);
|
|
|
|
- nodeHeartBeatResponse.setHeartbeatResponse(response);
|
|
|
|
|
|
+ nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
|
|
return nodeHeartBeatResponse;
|
|
return nodeHeartBeatResponse;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
LOG.info("Exception in heartbeat from node " +
|
|
LOG.info("Exception in heartbeat from node " +
|
|
@@ -295,55 +286,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|
// }
|
|
// }
|
|
}
|
|
}
|
|
|
|
|
|
- private void createNewNode(NodeId nodeId, String hostName, int cmPort,
|
|
|
|
- int httpPort, Node node, Resource capability) throws IOException {
|
|
|
|
-
|
|
|
|
- RMNode rmNode = new RMNodeImpl(nodeId, rmContext, hostName, cmPort,
|
|
|
|
- httpPort, node, capability, this);
|
|
|
|
-
|
|
|
|
- if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
|
|
|
|
- throw new IOException("Duplicate registration from the node!");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Record the new node
|
|
|
|
- synchronized (nodes) {
|
|
|
|
- LOG.info("DEBUG -- Adding " + hostName);
|
|
|
|
- HeartbeatResponse response = recordFactory
|
|
|
|
- .newRecordInstance(HeartbeatResponse.class);
|
|
|
|
- response.setResponseId(0);
|
|
|
|
- this.lastHeartBeats.put(nodeId, response);
|
|
|
|
- nodes.put(rmNode.getNodeAddress(), nodeId);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void RMNodeRemoved(NodeId nodeId) {
|
|
|
|
- RMNode node = null;
|
|
|
|
- synchronized (nodes) {
|
|
|
|
- node = this.rmContext.getRMNodes().get(nodeId);
|
|
|
|
- if (node != null) {
|
|
|
|
- nodes.remove(node.getNodeAddress());
|
|
|
|
- this.lastHeartBeats.remove(nodeId);
|
|
|
|
- } else {
|
|
|
|
- LOG.warn("Unknown node " + nodeId + " unregistered");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (node != null) {
|
|
|
|
- this.rmContext.getRMNodes().remove(nodeId);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private NodeId mayBeCreateAndGetNodeId(String node) {
|
|
|
|
- NodeId nodeId;
|
|
|
|
- nodeId = nodes.get(node);
|
|
|
|
- if (nodeId == null) {
|
|
|
|
- nodeId = recordFactory.newRecordInstance(NodeId.class);
|
|
|
|
- nodeId.setId(nodeCounter.getAndIncrement());
|
|
|
|
- }
|
|
|
|
- return nodeId;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* resolving the network topology.
|
|
* resolving the network topology.
|
|
* @param hostName the hostname of this node.
|
|
* @param hostName the hostname of this node.
|