|
@@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
+import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
|
|
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
|
|
|
@@ -67,6 +69,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|
|
private final NMLivelinessMonitor nmLivelinessMonitor;
|
|
|
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
|
|
|
|
|
+ private long nextHeartBeatInterval;
|
|
|
private Server server;
|
|
|
private InetSocketAddress resourceTrackerAddress;
|
|
|
|
|
@@ -100,6 +103,14 @@ public class ResourceTrackerService extends AbstractService implements
|
|
|
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
|
|
|
|
|
|
RackResolver.init(conf);
|
|
|
+ nextHeartBeatInterval =
|
|
|
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
|
|
|
+ if (nextHeartBeatInterval <= 0) {
|
|
|
+ throw new YarnException("Invalid Configuration. "
|
|
|
+ + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
|
|
|
+ + " should be larger than 0.");
|
|
|
+ }
|
|
|
super.init(conf);
|
|
|
}
|
|
|
|
|
@@ -223,9 +234,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|
|
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
|
|
return shutDown;
|
|
|
}
|
|
|
-
|
|
|
- NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
|
|
|
- .newRecordInstance(NodeHeartbeatResponse.class);
|
|
|
|
|
|
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
|
|
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
|
|
@@ -246,10 +254,11 @@ public class ResourceTrackerService extends AbstractService implements
|
|
|
}
|
|
|
|
|
|
// Heartbeat response
|
|
|
- nodeHeartBeatResponse.setResponseId(lastNodeHeartbeatResponse.getResponseId() + 1);
|
|
|
+ NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
|
|
|
+ .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.
|
|
|
+ getResponseId() + 1, NodeAction.NORMAL, null, null, null,
|
|
|
+ nextHeartBeatInterval);
|
|
|
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
|
|
|
- nodeHeartBeatResponse.setNodeAction(NodeAction.NORMAL);
|
|
|
-
|
|
|
// Check if node's masterKey needs to be updated and if the currentKey has
|
|
|
// roller over, send it across
|
|
|
if (isSecurityEnabled()) {
|