|
@@ -141,6 +141,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
/* Resource utilization for the node. */
|
|
|
private ResourceUtilization nodeUtilization;
|
|
|
|
|
|
+ /* Track last increment made to Utilization metrics*/
|
|
|
+ private Resource lastUtilIncr = Resources.none();
|
|
|
+
|
|
|
/** Physical resources in the node. */
|
|
|
private volatile Resource physicalResource;
|
|
|
|
|
@@ -399,7 +402,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
this.lastHealthReportTime = System.currentTimeMillis();
|
|
|
this.nodeManagerVersion = nodeManagerVersion;
|
|
|
this.timeStamp = 0;
|
|
|
- this.physicalResource = physResource;
|
|
|
+ // If physicalResource is not available, capability is a reasonable guess
|
|
|
+ this.physicalResource = physResource==null ? capability : physResource;
|
|
|
|
|
|
this.latestNodeHeartBeatResponse.setResponseId(0);
|
|
|
|
|
@@ -548,6 +552,37 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void clearContributionToUtilizationMetrics() {
|
|
|
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
|
|
+ metrics.decrUtilizedMB(lastUtilIncr.getMemorySize());
|
|
|
+ metrics.decrUtilizedVirtualCores(lastUtilIncr.getVirtualCores());
|
|
|
+ lastUtilIncr = Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateClusterUtilizationMetrics() {
|
|
|
+ // Update cluster utilization metrics
|
|
|
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
|
|
+ Resource prevIncr = lastUtilIncr;
|
|
|
+
|
|
|
+ if (this.nodeUtilization == null) {
|
|
|
+ lastUtilIncr = Resources.none();
|
|
|
+ } else {
|
|
|
+ /* Scale memory contribution based on configured node size */
|
|
|
+ long newmem = (long)((float)this.nodeUtilization.getPhysicalMemory()
|
|
|
+ / Math.max(1.0f, this.getPhysicalResource().getMemorySize())
|
|
|
+ * this.getTotalCapability().getMemorySize());
|
|
|
+ lastUtilIncr =
|
|
|
+ Resource.newInstance(newmem,
|
|
|
+ (int) (this.nodeUtilization.getCPU()
|
|
|
+ / Math.max(1.0f, this.getPhysicalResource().getVirtualCores())
|
|
|
+ * this.getTotalCapability().getVirtualCores()));
|
|
|
+ }
|
|
|
+ metrics.incrUtilizedMB(lastUtilIncr.getMemorySize() -
|
|
|
+ prevIncr.getMemorySize());
|
|
|
+ metrics.incrUtilizedVirtualCores(lastUtilIncr.getVirtualCores() -
|
|
|
+ prevIncr.getVirtualCores());
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public ResourceUtilization getNodeUtilization() {
|
|
|
this.readLock.lock();
|
|
@@ -706,6 +741,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
|
|
|
private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
|
|
|
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
|
|
+ // Update utilization metrics
|
|
|
+ this.updateClusterUtilizationMetrics();
|
|
|
|
|
|
switch (previousNodeState) {
|
|
|
case LOST:
|
|
@@ -764,6 +801,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
private void updateMetricsForDeactivatedNode(NodeState initialState,
|
|
|
NodeState finalState) {
|
|
|
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
|
|
+ // Update utilization metrics
|
|
|
+ clearContributionToUtilizationMetrics();
|
|
|
|
|
|
switch (initialState) {
|
|
|
case RUNNING:
|
|
@@ -1260,6 +1299,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|
|
statusEvent.getOpportunisticContainersStatus());
|
|
|
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
|
|
|
rmNode, statusEvent);
|
|
|
+ rmNode.updateClusterUtilizationMetrics();
|
|
|
NodeState initialState = rmNode.getState();
|
|
|
boolean isNodeDecommissioning =
|
|
|
initialState.equals(NodeState.DECOMMISSIONING);
|