浏览代码

YARN-8298. Added express upgrade for YARN service.
Contributed by Chandni Singh

Eric Yang 6 年之前
父节点
当前提交
e557c6bd8d
共有 19 个文件被更改,包括 661 次插入190 次删除
  1. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
  2. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
  3. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
  4. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
  5. 99 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
  6. 12 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
  7. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
  8. 70 30
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
  9. 13 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
  10. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
  11. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
  12. 44 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
  14. 188 111
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
  15. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
  16. 96 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java
  17. 16 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
  18. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
  19. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java

@@ -600,6 +600,26 @@ public class ApiServiceClient extends AppAdminClient {
     return output;
   }
 
+  @Override
+  public int actionUpgradeExpress(String appName, File path)
+      throws IOException, YarnException {
+    int result;
+    try {
+      Service service =
+          loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null);
+      service.setState(ServiceState.EXPRESS_UPGRADING);
+      String buffer = jsonSerDeser.toJson(service);
+      LOG.info("Upgrade in progress. Please wait..");
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to upgrade application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
   @Override
   public int initiateUpgrade(String appName,
       String fileName, boolean autoFinalize) throws IOException, YarnException {

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java

@@ -440,7 +440,8 @@ public class ApiServer {
       if (updateServiceData.getState() != null && (
           updateServiceData.getState() == ServiceState.UPGRADING ||
               updateServiceData.getState() ==
-                  ServiceState.UPGRADING_AUTO_FINALIZE)) {
+                  ServiceState.UPGRADING_AUTO_FINALIZE) ||
+          updateServiceData.getState() == ServiceState.EXPRESS_UPGRADING) {
         return upgradeService(updateServiceData, ugi);
       }
 
@@ -690,7 +691,11 @@ public class ApiServer {
       ServiceClient sc = getServiceClient();
       sc.init(YARN_CONFIG);
       sc.start();
-      sc.initiateUpgrade(service);
+      if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
+        sc.actionUpgradeExpress(service);
+      } else {
+        sc.initiateUpgrade(service);
+      }
       sc.close();
       return null;
     });
@@ -706,7 +711,8 @@ public class ApiServer {
       String serviceName, Set<String> compNames) throws YarnException,
       IOException, InterruptedException {
     Service service = getServiceFromClient(ugi, serviceName);
-    if (service.getState() != ServiceState.UPGRADING) {
+    if (!service.getState().equals(ServiceState.UPGRADING) &&
+        !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
       throw new YarnException(
           String.format("The upgrade of service %s has not been initiated.",
               service.getName()));

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java

@@ -166,7 +166,7 @@ public class ClientAMService extends AbstractService
       LOG.info("Upgrading service to version {} by {}", request.getVersion(),
           UserGroupInformation.getCurrentUser());
       context.getServiceManager().processUpgradeRequest(request.getVersion(),
-          request.getAutoFinalize());
+          request.getAutoFinalize(), request.getExpressUpgrade());
       return UpgradeServiceResponseProto.newBuilder().build();
     } catch (Exception ex) {
       return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.service.api.records.Component;
+
+import java.util.Queue;
 
 /**
  * Events are handled by {@link ServiceManager} to manage the service
@@ -29,6 +32,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
   private final ServiceEventType type;
   private String version;
   private boolean autoFinalize;
+  private boolean expressUpgrade;
+  private Queue<Component> compsToUpgradeInOrder;
 
   public ServiceEvent(ServiceEventType serviceEventType) {
     super(serviceEventType);
@@ -56,4 +61,24 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
     this.autoFinalize = autoFinalize;
     return this;
   }
+
+  public boolean isExpressUpgrade() {
+    return expressUpgrade;
+  }
+
+  public ServiceEvent setExpressUpgrade(boolean expressUpgrade) {
+    this.expressUpgrade = expressUpgrade;
+    return this;
+  }
+
+  public Queue<Component> getCompsToUpgradeInOrder() {
+    return compsToUpgradeInOrder;
+  }
+
+  public ServiceEvent setCompsToUpgradeInOrder(
+      Queue<Component> compsToUpgradeInOrder) {
+    this.compsToUpgradeInOrder = compsToUpgradeInOrder;
+    return this;
+  }
+
 }

+ 99 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java

@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.component.Component;
@@ -40,8 +41,11 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@@ -67,6 +71,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
   private final SliderFileSystem fs;
 
   private String upgradeVersion;
+  private Queue<org.apache.hadoop.yarn.service.api.records
+        .Component> compsToUpgradeInOrder;
 
   private static final StateMachineFactory<ServiceManager, State,
       ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@@ -141,14 +147,20 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     @Override
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
+      serviceManager.upgradeVersion = event.getVersion();
       try {
-        if (!event.isAutoFinalize()) {
-          serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
+        if (event.isExpressUpgrade()) {
+          serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
+          serviceManager.compsToUpgradeInOrder = event
+              .getCompsToUpgradeInOrder();
+          serviceManager.upgradeNextCompIfAny();
+        } else if (event.isAutoFinalize()) {
+          serviceManager.serviceSpec.setState(ServiceState
+              .UPGRADING_AUTO_FINALIZE);
         } else {
           serviceManager.serviceSpec.setState(
-              ServiceState.UPGRADING_AUTO_FINALIZE);
+              ServiceState.UPGRADING);
         }
-        serviceManager.upgradeVersion = event.getVersion();
         return State.UPGRADING;
       } catch (Throwable e) {
         LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@@ -169,8 +181,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       if (currState.equals(ServiceState.STABLE)) {
         return State.STABLE;
       }
+      if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
+        org.apache.hadoop.yarn.service.api.records.Component component =
+            serviceManager.compsToUpgradeInOrder.peek();
+        if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
+            !component.getState().equals(ComponentState.UPGRADING)) {
+          serviceManager.compsToUpgradeInOrder.remove();
+        }
+        serviceManager.upgradeNextCompIfAny();
+      }
       if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
-          event.getType().equals(ServiceEventType.START)) {
+          event.getType().equals(ServiceEventType.START) ||
+          (currState.equals(ServiceState.EXPRESS_UPGRADING) &&
+              serviceManager.compsToUpgradeInOrder.isEmpty())) {
         ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
         if (targetState.equals(ServiceState.STABLE)) {
           if (serviceManager.finalizeUpgrade()) {
@@ -184,6 +207,19 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     }
   }
 
+  private void upgradeNextCompIfAny() {
+    if (!compsToUpgradeInOrder.isEmpty()) {
+      org.apache.hadoop.yarn.service.api.records.Component component =
+          compsToUpgradeInOrder.peek();
+
+      ComponentEvent needUpgradeEvent = new ComponentEvent(
+          component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
+          component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
+      context.scheduler.getDispatcher().getEventHandler().handle(
+          needUpgradeEvent);
+    }
+  }
+
   /**
    * @return whether finalization of upgrade was successful.
    */
@@ -250,23 +286,18 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
   }
 
   void processUpgradeRequest(String upgradeVersion,
-      boolean autoFinalize) throws IOException {
+      boolean autoFinalize, boolean expressUpgrade) throws IOException {
     Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
         context.fs, context.service.getName(), upgradeVersion);
 
     List<org.apache.hadoop.yarn.service.api.records.Component>
-        compsThatNeedUpgrade = componentsFinder.
+        compsNeedUpgradeList = componentsFinder.
         findTargetComponentSpecs(context.service, targetSpec);
-    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
-        .setVersion(upgradeVersion)
-        .setAutoFinalize(autoFinalize);
-    context.scheduler.getDispatcher().getEventHandler().handle(event);
 
-    if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
-      if (autoFinalize) {
-        event.setAutoFinalize(true);
-      }
-      compsThatNeedUpgrade.forEach(component -> {
+    // remove all components from need upgrade list if there restart policy
+    // doesn't all upgrade.
+    if (compsNeedUpgradeList != null) {
+      compsNeedUpgradeList.removeIf(component -> {
         org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
             restartPolicy = component.getRestartPolicy();
 
@@ -274,25 +305,65 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
             Component.getRestartPolicyHandler(restartPolicy);
         // Do not allow upgrades for components which have NEVER/ON_FAILURE
         // restart policy
-        if (restartPolicyHandler.allowUpgrades()) {
+        if (!restartPolicyHandler.allowUpgrades()) {
+          LOG.info("The component {} has a restart policy that doesnt " +
+                  "allow upgrades {} ", component.getName(),
+              component.getRestartPolicy().toString());
+          return true;
+        }
+
+        return false;
+      });
+    }
+
+    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
+        .setVersion(upgradeVersion)
+        .setAutoFinalize(autoFinalize)
+        .setExpressUpgrade(expressUpgrade);
+
+    if (expressUpgrade) {
+      // In case of express upgrade  components need to be upgraded in order.
+      // Once the service manager gets notified that a component finished
+      // upgrading, it then issues event to upgrade the next component.
+      Map<String, org.apache.hadoop.yarn.service.api.records.Component>
+          compsNeedUpgradeByName = new HashMap<>();
+      if (compsNeedUpgradeList != null) {
+        compsNeedUpgradeList.forEach(component ->
+            compsNeedUpgradeByName.put(component.getName(), component));
+      }
+      List<String> resolvedComps = ServiceApiUtil
+          .resolveCompsDependency(targetSpec);
+
+      Queue<org.apache.hadoop.yarn.service.api.records.Component>
+          orderedCompUpgrade = new LinkedList<>();
+      resolvedComps.forEach(compName -> {
+        org.apache.hadoop.yarn.service.api.records.Component component =
+            compsNeedUpgradeByName.get(compName);
+        if (component != null ) {
+          orderedCompUpgrade.add(component);
+        }
+      });
+      event.setCompsToUpgradeInOrder(orderedCompUpgrade);
+    }
+
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+
+    if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
+      if (!expressUpgrade) {
+        compsNeedUpgradeList.forEach(component -> {
           ComponentEvent needUpgradeEvent = new ComponentEvent(
               component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
               component).setUpgradeVersion(event.getVersion());
           context.scheduler.getDispatcher().getEventHandler().handle(
               needUpgradeEvent);
-        } else {
-          LOG.info("The component {} has a restart "
-              + "policy that doesnt allow upgrades {} ", component.getName(),
-              component.getRestartPolicy().toString());
-        }
-      });
-    } else {
+
+        });
+      }
+    }  else if (autoFinalize) {
       // nothing to upgrade if upgrade auto finalize is requested, trigger a
       // state check.
-      if (autoFinalize) {
-        context.scheduler.getDispatcher().getEventHandler().handle(
-            new ServiceEvent(ServiceEventType.CHECK_STABLE));
-      }
+      context.scheduler.getDispatcher().getEventHandler().handle(
+          new ServiceEvent(ServiceEventType.CHECK_STABLE));
     }
   }
 

+ 12 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java

@@ -219,7 +219,7 @@ public class ServiceScheduler extends CompositeService {
     nmClient.getClient().cleanupRunningContainersOnStop(false);
     addIfService(nmClient);
 
-    dispatcher = new AsyncDispatcher("Component  dispatcher");
+    dispatcher = createAsyncDispatcher();
     dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
     dispatcher.register(ComponentEventType.class,
         new ComponentEventHandler());
@@ -253,6 +253,9 @@ public class ServiceScheduler extends CompositeService {
         YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS,
         YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
         app.getConfiguration(), getConfig());
+
+    serviceManager = createServiceManager();
+    context.setServiceManager(serviceManager);
   }
 
   protected YarnRegistryViewForProviders createYarnRegistryOperations(
@@ -262,6 +265,14 @@ public class ServiceScheduler extends CompositeService {
         context.attemptId);
   }
 
+  protected ServiceManager createServiceManager() {
+    return new ServiceManager(context);
+  }
+
+  protected AsyncDispatcher createAsyncDispatcher() {
+    return new AsyncDispatcher("Component  dispatcher");
+  }
+
   protected NMClientAsync createNMClient() {
     return NMClientAsync.createNMClientAsync(new NMClientCallback());
   }
@@ -344,8 +355,6 @@ public class ServiceScheduler extends CompositeService {
 
     // Since AM has been started and registered, the service is in STARTED state
     app.setState(ServiceState.STARTED);
-    serviceManager = new ServiceManager(context);
-    context.setServiceManager(serviceManager);
 
     // recover components based on containers sent from RM
     recoverComponents(response);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java

@@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
 public enum ServiceState {
   ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
-  UPGRADING_AUTO_FINALIZE;
+  UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING;
 }

+ 70 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.service.client;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -215,48 +216,31 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     return EXIT_SUCCESS;
   }
 
-  @Override
-  public int initiateUpgrade(String appName, String fileName,
-      boolean autoFinalize)
-      throws IOException, YarnException {
-    Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
-        null, null);
-    if (autoFinalize) {
-      upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
-    } else {
-      upgradeService.setState(ServiceState.UPGRADING);
-    }
-    return initiateUpgrade(upgradeService);
-  }
-
-  public int initiateUpgrade(Service service) throws YarnException,
-      IOException {
+  private ApplicationReport upgradePrecheck(Service service)
+      throws YarnException, IOException {
     boolean upgradeEnabled = getConfig().getBoolean(
-        YARN_SERVICE_UPGRADE_ENABLED,
-        YARN_SERVICE_UPGRADE_ENABLED_DEFAULT);
+        YARN_SERVICE_UPGRADE_ENABLED, YARN_SERVICE_UPGRADE_ENABLED_DEFAULT);
     if (!upgradeEnabled) {
       throw new YarnException(ErrorStrings.SERVICE_UPGRADE_DISABLED);
     }
-    Service persistedService =
-        ServiceApiUtil.loadService(fs, service.getName());
+    Service persistedService = ServiceApiUtil.loadService(fs,
+        service.getName());
     if (!StringUtils.isEmpty(persistedService.getId())) {
-      cachedAppInfo.put(persistedService.getName(), new AppInfo(
-          ApplicationId.fromString(persistedService.getId()),
-          persistedService.getKerberosPrincipal().getPrincipalName()));
+      cachedAppInfo.put(persistedService.getName(),
+          new AppInfo(ApplicationId.fromString(persistedService.getId()),
+              persistedService.getKerberosPrincipal().getPrincipalName()));
     }
 
     if (persistedService.getVersion().equals(service.getVersion())) {
-      String message =
-          service.getName() + " is already at version " + service.getVersion()
-              + ". There is nothing to upgrade.";
+      String message = service.getName() + " is already at version "
+          + service.getVersion() + ". There is nothing to upgrade.";
       LOG.error(message);
       throw new YarnException(message);
     }
 
     Service liveService = getStatus(service.getName());
     if (!liveService.getState().equals(ServiceState.STABLE)) {
-      String message = service.getName() + " is at " +
-          liveService.getState()
+      String message = service.getName() + " is at " + liveService.getState()
           + " state and upgrade can only be initiated when service is STABLE.";
       LOG.error(message);
       throw new YarnException(message);
@@ -266,11 +250,67 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
     ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
 
-    ApplicationReport appReport =
-        yarnClient.getApplicationReport(getAppId(service.getName()));
+    ApplicationReport appReport = yarnClient
+        .getApplicationReport(getAppId(service.getName()));
     if (StringUtils.isEmpty(appReport.getHost())) {
       throw new YarnException(service.getName() + " AM hostname is empty");
     }
+    return appReport;
+  }
+
+  @Override
+  public int actionUpgradeExpress(String appName, File path)
+      throws IOException, YarnException {
+    Service service =
+        loadAppJsonFromLocalFS(path.getAbsolutePath(), appName, null, null);
+    service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+    actionUpgradeExpress(service);
+    return EXIT_SUCCESS;
+  }
+
+  public int actionUpgradeExpress(Service service) throws YarnException,
+      IOException {
+    ApplicationReport appReport = upgradePrecheck(service);
+    ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
+    UpgradeServiceRequestProto.Builder requestBuilder =
+        UpgradeServiceRequestProto.newBuilder();
+    requestBuilder.setVersion(service.getVersion());
+    if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+      requestBuilder.setAutoFinalize(true);
+    }
+    if (service.getState().equals(ServiceState.EXPRESS_UPGRADING)) {
+      requestBuilder.setExpressUpgrade(true);
+      requestBuilder.setAutoFinalize(true);
+    }
+    UpgradeServiceResponseProto responseProto = proxy.upgrade(
+        requestBuilder.build());
+    if (responseProto.hasError()) {
+      LOG.error("Service {} express upgrade to version {} failed because {}",
+          service.getName(), service.getVersion(), responseProto.getError());
+      throw new YarnException("Failed to express upgrade service " +
+          service.getName() + " to version " + service.getVersion() +
+          " because " + responseProto.getError());
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int initiateUpgrade(String appName, String fileName,
+      boolean autoFinalize)
+      throws IOException, YarnException {
+    Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
+        null, null);
+    if (autoFinalize) {
+      upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+    } else {
+      upgradeService.setState(ServiceState.UPGRADING);
+    }
+    return initiateUpgrade(upgradeService);
+  }
+
+  public int initiateUpgrade(Service service) throws YarnException,
+      IOException {
+    ApplicationReport appReport = upgradePrecheck(service);
     ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
 
     UpgradeServiceRequestProto.Builder requestBuilder =

+ 13 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.component;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import static org.apache.hadoop.yarn.service.api.records.Component
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
@@ -546,13 +548,21 @@ public class Component implements EventHandler<ComponentEvent> {
     @Override
     public void transition(Component component, ComponentEvent event) {
       component.upgradeInProgress.set(true);
+      component.upgradeEvent = event;
       component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
           records.ComponentState.NEEDS_UPGRADE);
       component.numContainersThatNeedUpgrade.set(
           component.componentSpec.getNumberOfContainers());
-      component.componentSpec.getContainers().forEach(container ->
-          container.setState(ContainerState.NEEDS_UPGRADE));
-      component.upgradeEvent = event;
+      component.componentSpec.getContainers().forEach(container -> {
+        container.setState(ContainerState.NEEDS_UPGRADE);
+        if (event.isExpressUpgrade()) {
+          ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
+              ContainerId.fromString(container.getId()),
+                  ComponentInstanceEventType.UPGRADE);
+          LOG.info("Upgrade container {}", container.getId());
+          component.dispatcher.getEventHandler().handle(upgradeEvent);
+        }
+      });
     }
   }
 

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java

@@ -35,6 +35,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
   private ContainerId containerId;
   private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
   private String upgradeVersion;
+  private boolean expressUpgrade;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -113,4 +114,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
     this.upgradeVersion = upgradeVersion;
     return this;
   }
+
+  public boolean isExpressUpgrade() {
+    return expressUpgrade;
+  }
+
+  public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
+    this.expressUpgrade = expressUpgrade;
+    return this;
+  }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java

@@ -380,6 +380,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     @Override
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
+      if (!compInstance.containerSpec.getState().equals(
+          ContainerState.NEEDS_UPGRADE)) {
+        //nothing to upgrade. this may happen with express upgrade.
+        return;
+      }
       compInstance.containerSpec.setState(ContainerState.UPGRADING);
       compInstance.component.decContainersReady(false);
       ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java

@@ -638,6 +638,32 @@ public class ServiceApiUtil {
     return containerNeedUpgrade;
   }
 
+  /**
+   * Validates the components that are requested are stable for upgrade.
+   * It returns the instances of the components which are in ready state.
+   */
+  public static List<Container> validateAndResolveCompsStable(
+      Service liveService, Collection<String> compNames) throws YarnException {
+    Preconditions.checkNotNull(compNames);
+    HashSet<String> requestedComps = Sets.newHashSet(compNames);
+    List<Container> containerNeedUpgrade = new ArrayList<>();
+    for (Component liveComp : liveService.getComponents()) {
+      if (requestedComps.contains(liveComp.getName())) {
+        if (!liveComp.getState().equals(ComponentState.STABLE)) {
+          // Nothing to upgrade
+          throw new YarnException(String.format(
+              ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
+        }
+        liveComp.getContainers().forEach(liveContainer -> {
+          if (liveContainer.getState().equals(ContainerState.READY)) {
+            containerNeedUpgrade.add(liveContainer);
+          }
+        });
+      }
+    }
+    return containerNeedUpgrade;
+  }
+
   private static String parseComponentName(String componentInstanceName)
       throws YarnException {
     int idx = componentInstanceName.lastIndexOf('-');
@@ -651,4 +677,22 @@ public class ServiceApiUtil {
   public static String $(String s) {
     return "${" + s +"}";
   }
+
+  public static List<String> resolveCompsDependency(Service service) {
+    List<String> components = new ArrayList<String>();
+    for (Component component : service.getComponents()) {
+      int depSize = component.getDependencies().size();
+      if (!components.contains(component.getName())) {
+        components.add(component.getName());
+      }
+      if (depSize != 0) {
+        for (String depComp : component.getDependencies()) {
+          if (!components.contains(depComp)) {
+            components.add(0, depComp);
+          }
+        }
+      }
+    }
+    return components;
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto

@@ -66,6 +66,7 @@ message StopResponseProto {
 message UpgradeServiceRequestProto {
   optional string version = 1;
   optional bool autoFinalize = 2;
+  optional bool expressUpgrade = 3;
 }
 
 message UpgradeServiceResponseProto {

+ 188 - 111
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java

@@ -19,23 +19,26 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.ComponentState;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
-import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Tests for {@link ServiceManager}.
@@ -46,117 +49,120 @@ public class TestServiceManager {
   public ServiceTestUtils.ServiceFSWatcher rule =
       new ServiceTestUtils.ServiceFSWatcher();
 
-  @Test
-  public void testUpgrade() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager("testUpgrade");
-    upgrade(serviceManager, "v2", false, false);
+  @Test (timeout = TIMEOUT)
+  public void testUpgrade() throws Exception {
+    ServiceContext context = createServiceContext("testUpgrade");
+    initUpgrade(context, "v2", false, false, false);
     Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
-        serviceManager.getServiceSpec().getState());
+        context.getServiceManager().getServiceSpec().getState());
   }
 
-  @Test
+  @Test (timeout = TIMEOUT)
   public void testRestartNothingToUpgrade()
-      throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
+      throws Exception {
+    ServiceContext context = createServiceContext(
         "testRestartNothingToUpgrade");
-    upgrade(serviceManager, "v2", false, false);
-
-    //make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
-      comp.setState(ComponentState.STABLE);
-    });
-    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+    initUpgrade(context, "v2", false, false, false);
+    ServiceManager manager = context.getServiceManager();
+    //make components stable by upgrading all instances
+    upgradeAllInstances(context);
+
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.START));
+    GenericTestUtils.waitFor(()->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
     Assert.assertEquals("service not re-started", ServiceState.STABLE,
-        serviceManager.getServiceSpec().getState());
+        manager.getServiceSpec().getState());
   }
 
-  @Test
-  public void testAutoFinalizeNothingToUpgrade() throws IOException,
-      SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
+  @Test(timeout = TIMEOUT)
+  public void testAutoFinalizeNothingToUpgrade() throws Exception {
+    ServiceContext context = createServiceContext(
         "testAutoFinalizeNothingToUpgrade");
-    upgrade(serviceManager, "v2", false, true);
-
-    //make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp ->
-        comp.setState(ComponentState.STABLE));
-    serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE));
+    initUpgrade(context, "v2", false, true, false);
+    ServiceManager manager = context.getServiceManager();
+    //make components stable by upgrading all instances
+    upgradeAllInstances(context);
+
+    GenericTestUtils.waitFor(()->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
     Assert.assertEquals("service stable", ServiceState.STABLE,
-        serviceManager.getServiceSpec().getState());
+        manager.getServiceSpec().getState());
   }
 
-  @Test
+  @Test(timeout = TIMEOUT)
   public void testRestartWithPendingUpgrade()
-      throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager("testRestart");
-    upgrade(serviceManager, "v2", true, false);
-    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+      throws Exception {
+    ServiceContext context = createServiceContext("testRestart");
+    initUpgrade(context, "v2", true, false, false);
+    ServiceManager manager = context.getServiceManager();
+
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.START));
+    context.scheduler.getDispatcher().stop();
     Assert.assertEquals("service should still be upgrading",
-        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+        ServiceState.UPGRADING, manager.getServiceSpec().getState());
   }
 
-  @Test
-  public void testCheckState() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
-        "testCheckState");
-    upgrade(serviceManager, "v2", true, false);
+  @Test(timeout = TIMEOUT)
+  public void testFinalize() throws Exception {
+    ServiceContext context = createServiceContext("testCheckState");
+    initUpgrade(context, "v2", true, false, false);
+    ServiceManager manager = context.getServiceManager();
     Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
-        serviceManager.getServiceSpec().getState());
+        manager.getServiceSpec().getState());
 
-    // make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
-      comp.setState(ComponentState.STABLE);
-    });
-    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
-    serviceManager.handle(checkStable);
-    Assert.assertEquals("service should still be upgrading",
-        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+    //make components stable by upgrading all instances
+    upgradeAllInstances(context);
 
     // finalize service
-    ServiceEvent restart = new ServiceEvent(ServiceEventType.START);
-    serviceManager.handle(restart);
-    Assert.assertEquals("service not stable",
-        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+    context.scheduler.getDispatcher().getEventHandler().handle(
+        new ServiceEvent(ServiceEventType.START));
+    GenericTestUtils.waitFor(()->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
+    Assert.assertEquals("service not re-started", ServiceState.STABLE,
+        manager.getServiceSpec().getState());
 
-    validateUpgradeFinalization(serviceManager.getName(), "v2");
+    validateUpgradeFinalization(manager.getName(), "v2");
   }
 
-  @Test
-  public void testCheckStateAutoFinalize() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
-        "testCheckState");
-    serviceManager.getServiceSpec().setState(
+  @Test(timeout = TIMEOUT)
+  public void testAutoFinalize() throws Exception {
+    ServiceContext context = createServiceContext("testCheckStateAutoFinalize");
+    ServiceManager manager = context.getServiceManager();
+    manager.getServiceSpec().setState(
         ServiceState.UPGRADING_AUTO_FINALIZE);
-    upgrade(serviceManager, "v2", true, true);
-    Assert.assertEquals("service not upgrading",
-        ServiceState.UPGRADING_AUTO_FINALIZE,
-        serviceManager.getServiceSpec().getState());
+    initUpgrade(context, "v2", true, true, false);
 
     // make components stable
-    serviceManager.getServiceSpec().getComponents().forEach(comp ->
-        comp.setState(ComponentState.STABLE));
-    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
-    serviceManager.handle(checkStable);
+    upgradeAllInstances(context);
+
+    GenericTestUtils.waitFor(() ->
+        context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
     Assert.assertEquals("service not stable",
-        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+        ServiceState.STABLE, manager.getServiceSpec().getState());
 
-    validateUpgradeFinalization(serviceManager.getName(), "v2");
+    validateUpgradeFinalization(manager.getName(), "v2");
   }
 
   @Test
-  public void testInvalidUpgrade() throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager(
-        "testInvalidUpgrade");
-    serviceManager.getServiceSpec().setState(
+  public void testInvalidUpgrade() throws Exception {
+    ServiceContext serviceContext = createServiceContext("testInvalidUpgrade");
+    ServiceManager manager = serviceContext.getServiceManager();
+    manager.getServiceSpec().setState(
         ServiceState.UPGRADING_AUTO_FINALIZE);
     Service upgradedDef = ServiceTestUtils.createExampleApplication();
-    upgradedDef.setName(serviceManager.getName());
+    upgradedDef.setName(manager.getName());
     upgradedDef.setVersion("v2");
     upgradedDef.setLifetime(2L);
     writeUpgradedDef(upgradedDef);
 
     try {
-      serviceManager.processUpgradeRequest("v2", true);
+      manager.processUpgradeRequest("v2", true, false);
     } catch (Exception ex) {
       Assert.assertTrue(ex instanceof UnsupportedOperationException);
       return;
@@ -164,6 +170,32 @@ public class TestServiceManager {
     Assert.fail();
   }
 
+  @Test(timeout = TIMEOUT)
+  public void testExpressUpgrade() throws Exception {
+    ServiceContext context = createServiceContext("testExpressUpgrade");
+    ServiceManager manager = context.getServiceManager();
+    manager.getServiceSpec().setState(
+        ServiceState.EXPRESS_UPGRADING);
+    initUpgrade(context, "v2", true, true, true);
+
+    List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
+    // wait till instances of first component are in upgrade
+    String comp1 = comps.get(0);
+    upgradeInstancesOf(context, comp1);
+
+    // wait till instances of second component are in upgrade
+    String comp2 = comps.get(1);
+    upgradeInstancesOf(context, comp2);
+
+    GenericTestUtils.waitFor(() ->
+            context.service.getState().equals(ServiceState.STABLE),
+        CHECK_EVERY_MILLIS, TIMEOUT);
+
+    Assert.assertEquals("service not stable",
+        ServiceState.STABLE, manager.getServiceSpec().getState());
+    validateUpgradeFinalization(manager.getName(), "v2");
+  }
+
   private void validateUpgradeFinalization(String serviceName,
       String expectedVersion) throws IOException {
     Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
@@ -172,15 +204,16 @@ public class TestServiceManager {
     Assert.assertNotNull("app id not present", savedSpec.getId());
     Assert.assertEquals("state not stable", ServiceState.STABLE,
         savedSpec.getState());
-    savedSpec.getComponents().forEach(compSpec -> {
-      Assert.assertEquals("comp not stable", ComponentState.STABLE,
-          compSpec.getState());
-    });
+    savedSpec.getComponents().forEach(compSpec ->
+        Assert.assertEquals("comp not stable", ComponentState.STABLE,
+        compSpec.getState()));
   }
 
-  private void upgrade(ServiceManager serviceManager, String version,
-      boolean upgradeArtifact, boolean autoFinalize)
-      throws IOException, SliderException {
+  private void initUpgrade(ServiceContext context, String version,
+      boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade)
+      throws IOException, SliderException, TimeoutException,
+      InterruptedException {
+    ServiceManager serviceManager = context.getServiceManager();
     Service upgradedDef = ServiceTestUtils.createExampleApplication();
     upgradedDef.setName(serviceManager.getName());
     upgradedDef.setVersion(version);
@@ -191,39 +224,81 @@ public class TestServiceManager {
       });
     }
     writeUpgradedDef(upgradedDef);
-    serviceManager.processUpgradeRequest(version, autoFinalize);
+    serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
     ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
-    upgradeEvent.setVersion(version);
-    if (autoFinalize) {
-      upgradeEvent.setAutoFinalize(true);
-    }
-    serviceManager.handle(upgradeEvent);
+    upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade)
+        .setAutoFinalize(autoFinalize);
+
+    GenericTestUtils.waitFor(()-> {
+      ServiceState serviceState = context.service.getState();
+      if (serviceState.equals(ServiceState.UPGRADING) ||
+          serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
+          serviceState.equals(ServiceState.EXPRESS_UPGRADING)) {
+        return true;
+      }
+      return false;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
+  }
+
+  private void upgradeAllInstances(ServiceContext context) throws
+      TimeoutException, InterruptedException {
+    // upgrade the instances
+    context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
+          ComponentInstanceEventType.UPGRADE);
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
+    }));
+
+    // become ready
+    context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
+          ComponentInstanceEventType.BECOME_READY);
+
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
+    }));
+    GenericTestUtils.waitFor(()-> {
+      for (ComponentInstance instance:
+          context.scheduler.getLiveInstances().values()) {
+        if (!instance.getContainerState().equals(ContainerState.READY)) {
+          return false;
+        }
+      }
+      return true;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
   }
 
-  private ServiceManager createTestServiceManager(String name)
-      throws IOException {
-    ServiceContext context = new ServiceContext();
-    context.service = createBaseDef(name);
-    context.fs = rule.getFs();
-
-    context.scheduler = new ServiceScheduler(context) {
-      @Override
-      protected YarnRegistryViewForProviders createYarnRegistryOperations(
-          ServiceContext context, RegistryOperations registryClient) {
-        return mock(YarnRegistryViewForProviders.class);
+  private void upgradeInstancesOf(ServiceContext context, String compName)
+      throws TimeoutException, InterruptedException {
+    Collection<ComponentInstance> compInstances = context.scheduler
+        .getAllComponents().get(compName).getAllComponentInstances();
+    GenericTestUtils.waitFor(() -> {
+      for (ComponentInstance instance : compInstances) {
+        if (!instance.getContainerState().equals(ContainerState.UPGRADING)) {
+          return false;
+        }
       }
-    };
+      return true;
+    }, CHECK_EVERY_MILLIS, TIMEOUT);
 
-    context.scheduler.init(rule.getConf());
+    // instances of comp1 get upgraded and become ready event is triggered
+    // become ready
+    compInstances.forEach(instance -> {
+      ComponentInstanceEvent event = new ComponentInstanceEvent(
+          instance.getContainer().getId(),
+          ComponentInstanceEventType.BECOME_READY);
 
-    Map<String, org.apache.hadoop.yarn.service.component.Component>
-        componentState = context.scheduler.getAllComponents();
-    context.service.getComponents().forEach(component -> {
-      componentState.put(component.getName(),
-          new org.apache.hadoop.yarn.service.component.Component(component,
-              1L, context));
+      context.scheduler.getDispatcher().getEventHandler().handle(event);
     });
-    return new ServiceManager(context);
+  }
+
+  private ServiceContext createServiceContext(String name)
+      throws Exception {
+    Service service  = createBaseDef(name);
+    ServiceContext context = new MockRunningServiceContext(rule,
+        service);
+    context.scheduler.getDispatcher().setDrainEventsOnStop();
+    context.scheduler.getDispatcher().start();
+    return context;
   }
 
   public static Service createBaseDef(String name) {
@@ -257,4 +332,6 @@ public class TestServiceManager {
         upgradedDef);
   }
 
+  private static final int TIMEOUT = 200000;
+  private static final int CHECK_EVERY_MILLIS = 100;
 }

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java

@@ -415,6 +415,41 @@ public class TestYarnNativeServices extends ServiceTestUtils {
     client.actionDestroy(service.getName());
   }
 
+  @Test(timeout = 200000)
+  public void testExpressUpgrade() throws Exception {
+    setupInternal(NUM_NMS);
+    getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true);
+    ServiceClient client = createClient(getConf());
+
+    Service service = createExampleApplication();
+    client.actionCreate(service);
+    waitForServiceToBeStable(client, service);
+
+    // upgrade the service
+    Component component = service.getComponents().iterator().next();
+    service.setState(ServiceState.EXPRESS_UPGRADING);
+    service.setVersion("v2");
+    component.getConfiguration().getEnv().put("key1", "val1");
+    Component component2 = service.getComponent("compb");
+    component2.getConfiguration().getEnv().put("key2", "val2");
+    client.actionUpgradeExpress(service);
+
+    // wait for upgrade to complete
+    waitForServiceToBeStable(client, service);
+    Service active = client.getStatus(service.getName());
+    Assert.assertEquals("component not stable", ComponentState.STABLE,
+        active.getComponent(component.getName()).getState());
+    Assert.assertEquals("compa does not have new env", "val1",
+        active.getComponent(component.getName()).getConfiguration()
+            .getEnv("key1"));
+    Assert.assertEquals("compb does not have new env", "val2",
+        active.getComponent(component2.getName()).getConfiguration()
+            .getEnv("key2"));
+    LOG.info("Stop/destroy service {}", service);
+    client.actionStop(service.getName(), true);
+    client.actionDestroy(service.getName());
+  }
+
   // Test to verify ANTI_AFFINITY placement policy
   // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
   // 2. Create an example service with 3 containers

+ 96 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceApiUtil.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestServiceApiUtil.java

@@ -15,11 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.service;
+package org.apache.hadoop.yarn.service.utils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.ServiceTestUtils;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
@@ -30,8 +31,6 @@ import org.apache.hadoop.yarn.service.api.records.PlacementType;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -53,7 +53,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test for ServiceApiUtil helper methods.
  */
-public class TestServiceApiUtil {
+public class TestServiceApiUtil extends ServiceTestUtils {
   private static final Logger LOG = LoggerFactory
       .getLogger(TestServiceApiUtil.class);
   private static final String EXCEPTION_PREFIX = "Should have thrown " +
@@ -635,10 +635,12 @@ public class TestServiceApiUtil {
 
     try {
       ServiceApiUtil.validateKerberosPrincipal(app.getKerberosPrincipal());
-      Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name format.");
+      Assert.fail(EXCEPTION_PREFIX + "service with invalid principal name " +
+          "format.");
     } catch (IllegalArgumentException e) {
       assertEquals(
-          String.format(RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT,
+          String.format(
+              RestApiErrorMessages.ERROR_KERBEROS_PRINCIPAL_NAME_FORMAT,
               kp.getPrincipalName()),
           e.getMessage());
     }
@@ -650,4 +652,92 @@ public class TestServiceApiUtil {
       Assert.fail(NO_EXCEPTION_PREFIX + e.getMessage());
     }
   }
+
+  @Test
+  public void testResolveCompsDependency() {
+    Service service = createExampleApplication();
+    List<String> dependencies = new ArrayList<String>();
+    dependencies.add("compb");
+    Component compa = createComponent("compa");
+    compa.setDependencies(dependencies);
+    Component compb = createComponent("compb");
+    service.addComponent(compa);
+    service.addComponent(compb);
+    List<String> order = ServiceApiUtil.resolveCompsDependency(service);
+    List<String> expected = new ArrayList<String>();
+    expected.add("compb");
+    expected.add("compa");
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals("Components are not equal.", expected.get(i),
+          order.get(i));
+    }
+  }
+
+  @Test
+  public void testResolveCompsDependencyReversed() {
+    Service service = createExampleApplication();
+    List<String> dependencies = new ArrayList<String>();
+    dependencies.add("compa");
+    Component compa = createComponent("compa");
+    Component compb = createComponent("compb");
+    compb.setDependencies(dependencies);
+    service.addComponent(compa);
+    service.addComponent(compb);
+    List<String> order = ServiceApiUtil.resolveCompsDependency(service);
+    List<String> expected = new ArrayList<String>();
+    expected.add("compa");
+    expected.add("compb");
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals("Components are not equal.", expected.get(i),
+          order.get(i));
+    }
+  }
+
+  @Test
+  public void testResolveCompsCircularDependency() {
+    Service service = createExampleApplication();
+    List<String> dependencies = new ArrayList<String>();
+    List<String> dependencies2 = new ArrayList<String>();
+    dependencies.add("compb");
+    dependencies2.add("compa");
+    Component compa = createComponent("compa");
+    compa.setDependencies(dependencies);
+    Component compb = createComponent("compb");
+    compa.setDependencies(dependencies2);
+    service.addComponent(compa);
+    service.addComponent(compb);
+    List<String> order = ServiceApiUtil.resolveCompsDependency(service);
+    List<String> expected = new ArrayList<String>();
+    expected.add("compa");
+    expected.add("compb");
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals("Components are not equal.", expected.get(i),
+          order.get(i));
+    }
+  }
+
+  @Test
+  public void testResolveNoCompsDependency() {
+    Service service = createExampleApplication();
+    Component compa = createComponent("compa");
+    Component compb = createComponent("compb");
+    service.addComponent(compa);
+    service.addComponent(compb);
+    List<String> order = ServiceApiUtil.resolveCompsDependency(service);
+    List<String> expected = new ArrayList<String>();
+    expected.add("compa");
+    expected.add("compb");
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals("Components are not equal.", expected.get(i),
+          order.get(i));
+    }
+  }
+
+  public static Service createExampleApplication() {
+
+    Service exampleApp = new Service();
+    exampleApp.setName("example-app");
+    exampleApp.setVersion("v1");
+    return exampleApp;
+  }
 }

+ 16 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.client.cli;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
@@ -100,6 +101,7 @@ public class ApplicationCLI extends YarnCLI {
   public static final String COMPONENT = "component";
   public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch";
   public static final String UPGRADE_CMD = "upgrade";
+  public static final String UPGRADE_EXPRESS = "express";
   public static final String UPGRADE_INITIATE = "initiate";
   public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
   public static final String UPGRADE_FINALIZE = "finalize";
@@ -247,6 +249,9 @@ public class ApplicationCLI extends YarnCLI {
       opts.addOption(UPGRADE_CMD, true, "Upgrades an application/long-" +
           "running service. It requires either -initiate, -instances, or " +
           "-finalize options.");
+      opts.addOption(UPGRADE_EXPRESS, true, "Works with -upgrade option to " +
+          "perform express upgrade.  It requires the upgraded application " +
+          "specification file.");
       opts.addOption(UPGRADE_INITIATE, true, "Works with -upgrade option to " +
           "initiate the application upgrade. It requires the upgraded " +
           "application specification file.");
@@ -639,9 +644,9 @@ public class ApplicationCLI extends YarnCLI {
       moveApplicationAcrossQueues(cliParser.getOptionValue(APP_ID),
           cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE));
     } else if (cliParser.hasOption(UPGRADE_CMD)) {
-      if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE,
-          UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, COMPONENTS,
-          APP_TYPE_CMD)) {
+      if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS,
+          UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE,
+          COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
         printUsage(title, opts);
         return exitCode;
       }
@@ -649,7 +654,14 @@ public class ApplicationCLI extends YarnCLI {
       AppAdminClient client =  AppAdminClient.createAppAdminClient(appType,
           getConf());
       String appName = cliParser.getOptionValue(UPGRADE_CMD);
-      if (cliParser.hasOption(UPGRADE_INITIATE)) {
+      if (cliParser.hasOption(UPGRADE_EXPRESS)) {
+        File file = new File(cliParser.getOptionValue(UPGRADE_EXPRESS));
+        if (!file.exists()) {
+          System.err.println(file.getAbsolutePath() + " does not exist.");
+          return exitCode;
+        }
+        return client.actionUpgradeExpress(appName, file);
+      } else if (cliParser.hasOption(UPGRADE_INITIATE)) {
         if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
             UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, APP_TYPE_CMD)) {
           printUsage(title, opts);

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

@@ -2161,6 +2161,10 @@ public class TestYarnCLI {
     pw.println("                                          Optionally a destination folder");
     pw.println("                                          for the tarball can be");
     pw.println("                                          specified.");
+    pw.println(" -express <arg>                           Works with -upgrade option to");
+    pw.println("                                          perform express upgrade.  It");
+    pw.println("                                          requires the upgraded");
+    pw.println("                                          application specification file.");
     pw.println(" -finalize                                Works with -upgrade option to");
     pw.println("                                          finalize the upgrade.");
     pw.println(" -flex <Application Name or ID>           Changes number of running");

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -288,4 +289,15 @@ public abstract class AppAdminClient extends CompositeService {
       List<String> components, String version, List<String> containerStates)
       throws IOException, YarnException;
 
+  /**
+   * Express upgrade a long running service.
+   *
+   * @param appName  the name of the application
+   * @param fileName specification of application upgrade to save.
+   * @return exit code
+   */
+  @Public
+  @Unstable
+  public abstract int actionUpgradeExpress(String appName, File fileName)
+      throws IOException, YarnException;
 }