|
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.LocalizationState;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.client.api.NMClient;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.ServiceScheduler;
|
|
|
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
|
|
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
|
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
|
|
+import org.apache.hadoop.yarn.service.api.records.LocalizationStatus;
|
|
|
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
|
|
import org.apache.hadoop.yarn.service.component.Component;
|
|
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
|
@@ -51,6 +53,7 @@ import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
|
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
|
|
import org.apache.hadoop.yarn.service.monitor.probe.DefaultProbe;
|
|
|
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
|
|
+import org.apache.hadoop.yarn.service.provider.ProviderService;
|
|
|
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
|
|
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
|
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
|
@@ -65,10 +68,14 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.text.MessageFormat;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -115,6 +122,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
private String serviceVersion;
|
|
|
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
|
|
|
private boolean pendingCancelUpgrade = false;
|
|
|
+ private ProviderService.ResolvedLaunchParams resolvedParams;
|
|
|
+ private ScheduledFuture lclizationRetrieverFuture;
|
|
|
|
|
|
private static final StateMachineFactory<ComponentInstance,
|
|
|
ComponentInstanceState, ComponentInstanceEventType,
|
|
@@ -192,6 +201,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
ComponentInstanceEvent event) {
|
|
|
// Query container status for ip and host
|
|
|
compInstance.initializeStatusRetriever(event, 0);
|
|
|
+ compInstance.initializeLocalizationStatusRetriever(
|
|
|
+ event.getContainerId());
|
|
|
+
|
|
|
long containerStartTime = System.currentTimeMillis();
|
|
|
try {
|
|
|
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
|
@@ -277,6 +289,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
} else {
|
|
|
instance.initializeStatusRetriever(event, 0);
|
|
|
}
|
|
|
+ instance.initializeLocalizationStatusRetriever(event.getContainerId());
|
|
|
|
|
|
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
|
|
instance.component.getUpgradeStatus() :
|
|
@@ -292,6 +305,17 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
if (timelineServiceEnabled) {
|
|
|
serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec);
|
|
|
}
|
|
|
+ try {
|
|
|
+ List<org.apache.hadoop.yarn.api.records.LocalizationStatus>
|
|
|
+ statusesFromNM = scheduler.getNmClient().getClient()
|
|
|
+ .getLocalizationStatuses(container.getId(), container.getNodeId());
|
|
|
+ if (statusesFromNM != null && !statusesFromNM.isEmpty()) {
|
|
|
+ updateLocalizationStatuses(statusesFromNM);
|
|
|
+ }
|
|
|
+ } catch (YarnException | IOException e) {
|
|
|
+ LOG.warn("{} failure getting localization statuses", container.getId(),
|
|
|
+ e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static class ContainerBecomeNotReadyTransition extends BaseTransition {
|
|
@@ -411,6 +435,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
(status != null ? status.getDiagnostics() : UPGRADE_FAILED));
|
|
|
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
|
|
|
compInstance.cancelContainerStatusRetriever();
|
|
|
+ compInstance.cancelLclRetriever();
|
|
|
|
|
|
if (compInstance.getState().equals(READY)) {
|
|
|
compInstance.component.decContainersReady(true);
|
|
@@ -639,13 +664,16 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
|
|
|
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
|
|
cancelContainerStatusRetriever();
|
|
|
+ cancelLclRetriever();
|
|
|
setContainerStatus(container.getId(), null);
|
|
|
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
|
|
|
- scheduler.getContainerLaunchService()
|
|
|
+ Future<ProviderService.ResolvedLaunchParams> launchParamsFuture =
|
|
|
+ scheduler.getContainerLaunchService()
|
|
|
.reInitCompInstance(scheduler.getApp(), this,
|
|
|
this.container, this.component.createLaunchContext(
|
|
|
upgradeStatus.getTargetSpec(),
|
|
|
upgradeStatus.getTargetVersion()));
|
|
|
+ updateResolvedLaunchParams(launchParamsFuture);
|
|
|
}
|
|
|
|
|
|
private void initializeStatusRetriever(ComponentInstanceEvent event,
|
|
@@ -750,6 +778,61 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
return compInstanceId.getCompInstanceName();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ void updateLocalizationStatuses(
|
|
|
+ List<org.apache.hadoop.yarn.api.records.LocalizationStatus> statuses) {
|
|
|
+ Map<String, String> resourcesCpy = new HashMap<>();
|
|
|
+ try {
|
|
|
+ readLock.lock();
|
|
|
+ if (resolvedParams == null || resolvedParams.didLaunchFail() ||
|
|
|
+ resolvedParams.getResolvedRsrcPaths() == null ||
|
|
|
+ resolvedParams.getResolvedRsrcPaths().isEmpty()) {
|
|
|
+ cancelLclRetriever();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ resourcesCpy.putAll(resolvedParams.getResolvedRsrcPaths());
|
|
|
+ } finally {
|
|
|
+ readLock.unlock();
|
|
|
+ }
|
|
|
+ boolean allCompleted = true;
|
|
|
+ Map<String, LocalizationStatus> fromNM = new HashMap<>();
|
|
|
+ statuses.forEach(statusFromNM -> {
|
|
|
+ LocalizationStatus lstatus = new LocalizationStatus()
|
|
|
+ .destFile(statusFromNM.getResourceKey())
|
|
|
+ .diagnostics(statusFromNM.getDiagnostics())
|
|
|
+ .state(statusFromNM.getLocalizationState());
|
|
|
+ fromNM.put(statusFromNM.getResourceKey(), lstatus);
|
|
|
+ });
|
|
|
+
|
|
|
+ for (String resourceKey : resourcesCpy.keySet()) {
|
|
|
+ LocalizationStatus lstatus = fromNM.get(resourceKey);
|
|
|
+ if (lstatus == null ||
|
|
|
+ lstatus.getState().equals(LocalizationState.PENDING)) {
|
|
|
+ allCompleted = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ List<LocalizationStatus> statusList = new ArrayList<>();
|
|
|
+ statusList.addAll(fromNM.values());
|
|
|
+ this.containerSpec.setLocalizationStatuses(statusList);
|
|
|
+ if (allCompleted) {
|
|
|
+ cancelLclRetriever();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void updateResolvedLaunchParams(
|
|
|
+ Future<ProviderService.ResolvedLaunchParams> future) {
|
|
|
+ try {
|
|
|
+ writeLock.lock();
|
|
|
+ this.resolvedParams = future.get();
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ LOG.error("{} updating resolved params", getCompInstanceId(), e);
|
|
|
+ } finally {
|
|
|
+ writeLock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public ContainerStatus getContainerStatus() {
|
|
|
try {
|
|
|
readLock.lock();
|
|
@@ -916,6 +999,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
cancelContainerStatusRetriever();
|
|
|
scheduler.executorService.submit(() ->
|
|
|
cleanupRegistryAndCompHdfsDir(containerId));
|
|
|
+ cancelLclRetriever();
|
|
|
}
|
|
|
|
|
|
private void cleanupRegistry(ContainerId containerId) {
|
|
@@ -998,6 +1082,61 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class LocalizationStatusRetriever implements Runnable {
|
|
|
+ private ContainerId containerId;
|
|
|
+ private NodeId nodeId;
|
|
|
+ private NMClient nmClient;
|
|
|
+ private ComponentInstance instance;
|
|
|
+
|
|
|
+ LocalizationStatusRetriever(ServiceScheduler scheduler,
|
|
|
+ ContainerId containerId, ComponentInstance instance) {
|
|
|
+ this.nmClient = scheduler.getNmClient().getClient();
|
|
|
+ this.containerId = containerId;
|
|
|
+ this.instance = instance;
|
|
|
+ this.nodeId = instance.getNodeId();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ List<org.apache.hadoop.yarn.api.records.LocalizationStatus>
|
|
|
+ statusesFromNM = null;
|
|
|
+ try {
|
|
|
+ statusesFromNM = nmClient.getLocalizationStatuses(containerId,
|
|
|
+ nodeId);
|
|
|
+ } catch (YarnException | IOException e) {
|
|
|
+ LOG.error("{} Failed to get localization statuses for {} {} ",
|
|
|
+ instance.compInstanceId, nodeId, containerId, e);
|
|
|
+ }
|
|
|
+ if (statusesFromNM != null && !statusesFromNM.isEmpty()) {
|
|
|
+ instance.updateLocalizationStatuses(statusesFromNM);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initializeLocalizationStatusRetriever(
|
|
|
+ ContainerId containerId) {
|
|
|
+ LOG.info("{} retrieve localization statuses", compInstanceId);
|
|
|
+ lclizationRetrieverFuture = scheduler.executorService.scheduleAtFixedRate(
|
|
|
+ new LocalizationStatusRetriever(scheduler, containerId, this),
|
|
|
+ 0, 1, TimeUnit.SECONDS
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cancelLclRetriever() {
|
|
|
+ if (lclizationRetrieverFuture != null &&
|
|
|
+ !lclizationRetrieverFuture.isDone()) {
|
|
|
+ LOG.info("{} cancelling localization retriever", compInstanceId);
|
|
|
+ lclizationRetrieverFuture.cancel(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean isLclRetrieverActive() {
|
|
|
+ return lclizationRetrieverFuture != null &&
|
|
|
+ !lclizationRetrieverFuture.isCancelled()
|
|
|
+ && !lclizationRetrieverFuture.isDone();
|
|
|
+ }
|
|
|
+
|
|
|
public String getHostname() {
|
|
|
return getCompInstanceName() + getComponent().getHostnameSuffix();
|
|
|
}
|