|
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.util.VersionUtil;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
@@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
|
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
@@ -84,6 +86,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
|
private ResourceTracker resourceTracker;
|
|
|
private Resource totalResource;
|
|
|
private int httpPort;
|
|
|
+ private String nodeManagerVersionId;
|
|
|
+ private String minimumResourceManagerVersion;
|
|
|
private volatile boolean isStopped;
|
|
|
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
private boolean tokenKeepAliveEnabled;
|
|
@@ -138,6 +142,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
|
this.tokenRemovalDelayMs =
|
|
|
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
|
|
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
|
|
+
|
|
|
+ this.minimumResourceManagerVersion = conf.get(
|
|
|
+ YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
|
|
|
+ YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
|
|
|
|
|
|
// Default duration to track stopped containers on nodemanager is 10Min.
|
|
|
// This should not be assigned very large value as it will remember all the
|
|
@@ -168,6 +176,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
|
// NodeManager is the last service to start, so NodeId is available.
|
|
|
this.nodeId = this.context.getNodeId();
|
|
|
this.httpPort = this.context.getHttpPort();
|
|
|
+ this.nodeManagerVersionId = YarnVersionInfo.getVersion();
|
|
|
try {
|
|
|
// Registration has to be in start so that ContainerManager can get the
|
|
|
// perNM tokens needed to authenticate ContainerTokens.
|
|
@@ -235,6 +244,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
|
request.setHttpPort(this.httpPort);
|
|
|
request.setResource(this.totalResource);
|
|
|
request.setNodeId(this.nodeId);
|
|
|
+ request.setNMVersion(this.nodeManagerVersionId);
|
|
|
RegisterNodeManagerResponse regNMResponse =
|
|
|
resourceTracker.registerNodeManager(request);
|
|
|
this.rmIdentifier = regNMResponse.getRMIdentifier();
|
|
@@ -248,6 +258,26 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|
|
+ message);
|
|
|
}
|
|
|
|
|
|
+ // if ResourceManager version is too old then shutdown
|
|
|
+ if (!minimumResourceManagerVersion.equals("NONE")){
|
|
|
+ if (minimumResourceManagerVersion.equals("EqualToNM")){
|
|
|
+ minimumResourceManagerVersion = nodeManagerVersionId;
|
|
|
+ }
|
|
|
+ String rmVersion = regNMResponse.getRMVersion();
|
|
|
+ if (rmVersion == null) {
|
|
|
+ String message = "The Resource Manager's did not return a version. "
|
|
|
+ + "Valid version cannot be checked.";
|
|
|
+ throw new YarnRuntimeException("Shutting down the Node Manager. "
|
|
|
+ + message);
|
|
|
+ }
|
|
|
+ if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) {
|
|
|
+ String message = "The Resource Manager's version ("
|
|
|
+ + rmVersion +") is less than the minimum "
|
|
|
+ + "allowed version " + minimumResourceManagerVersion;
|
|
|
+ throw new YarnRuntimeException("Shutting down the Node Manager on RM "
|
|
|
+ + "version error, " + message);
|
|
|
+ }
|
|
|
+ }
|
|
|
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
|
|
|
// do this now so that its set before we start heartbeating to RM
|
|
|
// It is expected that status updater is started by this point and
|