|
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
|
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
|
|
|
+import org.apache.hadoop.yarn.service.api.records.Artifact;
|
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.service.component.Component;
|
|
import org.apache.hadoop.yarn.service.component.Component;
|
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
|
@@ -151,10 +152,19 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
@Override public void transition(ComponentInstance compInstance,
|
|
@Override public void transition(ComponentInstance compInstance,
|
|
ComponentInstanceEvent event) {
|
|
ComponentInstanceEvent event) {
|
|
// Query container status for ip and host
|
|
// Query container status for ip and host
|
|
|
|
+ boolean cancelOnSuccess = true;
|
|
|
|
+ if (compInstance.getCompSpec().getArtifact() != null && compInstance
|
|
|
|
+ .getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
|
|
|
+ // A docker container might get a different IP if the container is
|
|
|
|
+ // relaunched by the NM, so we need to keep checking the status.
|
|
|
|
+ // This is a temporary fix until the NM provides a callback for
|
|
|
|
+ // container relaunch (see YARN-8265).
|
|
|
|
+ cancelOnSuccess = false;
|
|
|
|
+ }
|
|
compInstance.containerStatusFuture =
|
|
compInstance.containerStatusFuture =
|
|
compInstance.scheduler.executorService.scheduleAtFixedRate(
|
|
compInstance.scheduler.executorService.scheduleAtFixedRate(
|
|
new ContainerStatusRetriever(compInstance.scheduler,
|
|
new ContainerStatusRetriever(compInstance.scheduler,
|
|
- event.getContainerId(), compInstance), 0, 1,
|
|
|
|
|
|
+ event.getContainerId(), compInstance, cancelOnSuccess), 0, 1,
|
|
TimeUnit.SECONDS);
|
|
TimeUnit.SECONDS);
|
|
long containerStartTime = System.currentTimeMillis();
|
|
long containerStartTime = System.currentTimeMillis();
|
|
try {
|
|
try {
|
|
@@ -373,14 +383,26 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
this.status = status;
|
|
this.status = status;
|
|
org.apache.hadoop.yarn.service.api.records.Container container =
|
|
org.apache.hadoop.yarn.service.api.records.Container container =
|
|
getCompSpec().getContainer(status.getContainerId().toString());
|
|
getCompSpec().getContainer(status.getContainerId().toString());
|
|
|
|
+ boolean doRegistryUpdate = true;
|
|
if (container != null) {
|
|
if (container != null) {
|
|
- container.setIp(StringUtils.join(",", status.getIPs()));
|
|
|
|
|
|
+ String existingIP = container.getIp();
|
|
|
|
+ String newIP = StringUtils.join(",", status.getIPs());
|
|
|
|
+ container.setIp(newIP);
|
|
container.setHostname(status.getHost());
|
|
container.setHostname(status.getHost());
|
|
- if (timelineServiceEnabled) {
|
|
|
|
|
|
+ if (existingIP != null && newIP.equals(existingIP)) {
|
|
|
|
+ doRegistryUpdate = false;
|
|
|
|
+ }
|
|
|
|
+ if (timelineServiceEnabled && doRegistryUpdate) {
|
|
serviceTimelinePublisher.componentInstanceIPHostUpdated(container);
|
|
serviceTimelinePublisher.componentInstanceIPHostUpdated(container);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- updateServiceRecord(yarnRegistryOperations, status);
|
|
|
|
|
|
+ if (doRegistryUpdate) {
|
|
|
|
+ cleanupRegistry(status.getContainerId());
|
|
|
|
+ LOG.info(
|
|
|
|
+ getCompInstanceId() + " new IP = " + status.getIPs() + ", host = "
|
|
|
|
+ + status.getHost() + ", updating registry");
|
|
|
|
+ updateServiceRecord(yarnRegistryOperations, status);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public String getCompName() {
|
|
public String getCompName() {
|
|
@@ -522,12 +544,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
private NodeId nodeId;
|
|
private NodeId nodeId;
|
|
private NMClient nmClient;
|
|
private NMClient nmClient;
|
|
private ComponentInstance instance;
|
|
private ComponentInstance instance;
|
|
|
|
+ private boolean cancelOnSuccess;
|
|
ContainerStatusRetriever(ServiceScheduler scheduler,
|
|
ContainerStatusRetriever(ServiceScheduler scheduler,
|
|
- ContainerId containerId, ComponentInstance instance) {
|
|
|
|
|
|
+ ContainerId containerId, ComponentInstance instance, boolean
|
|
|
|
+ cancelOnSuccess) {
|
|
this.containerId = containerId;
|
|
this.containerId = containerId;
|
|
this.nodeId = instance.getNodeId();
|
|
this.nodeId = instance.getNodeId();
|
|
this.nmClient = scheduler.getNmClient().getClient();
|
|
this.nmClient = scheduler.getNmClient().getClient();
|
|
this.instance = instance;
|
|
this.instance = instance;
|
|
|
|
+ this.cancelOnSuccess = cancelOnSuccess;
|
|
}
|
|
}
|
|
@Override public void run() {
|
|
@Override public void run() {
|
|
ContainerStatus status = null;
|
|
ContainerStatus status = null;
|
|
@@ -548,10 +573,12 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
instance.updateContainerStatus(status);
|
|
instance.updateContainerStatus(status);
|
|
- LOG.info(
|
|
|
|
- instance.compInstanceId + " IP = " + status.getIPs() + ", host = "
|
|
|
|
- + status.getHost() + ", cancel container status retriever");
|
|
|
|
- instance.containerStatusFuture.cancel(false);
|
|
|
|
|
|
+ if (cancelOnSuccess) {
|
|
|
|
+ LOG.info(
|
|
|
|
+ instance.compInstanceId + " IP = " + status.getIPs() + ", host = "
|
|
|
|
+ + status.getHost() + ", cancel container status retriever");
|
|
|
|
+ instance.containerStatusFuture.cancel(false);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|