فهرست منبع

YARN-7939. Added support to upgrade a component instance.
Contributed by Chandni Singh

Eric Yang 7 سال پیش
والد
کامیت
4a7369b095
41فایلهای تغییر یافته به همراه1961 افزوده شده و 287 حذف شده
  1. 71 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
  2. 169 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
  3. 98 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java
  4. 67 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java
  5. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java
  6. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
  7. 32 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
  8. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
  9. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
  10. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
  11. 118 38
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
  12. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
  13. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java
  15. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
  16. 82 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
  17. 128 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
  18. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
  20. 43 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
  21. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
  22. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
  23. 88 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
  24. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
  25. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
  26. 13 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java
  27. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java
  28. 18 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
  29. 47 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
  30. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
  31. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
  32. 112 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
  33. 28 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
  34. 75 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
  35. 136 51
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java
  36. 265 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java
  37. 88 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java
  38. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java
  39. 20 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
  40. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
  41. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

+ 71 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java

@@ -26,6 +26,7 @@ import java.util.Map;
 
 import javax.ws.rs.core.MediaType;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,11 +41,16 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
+import org.apache.hadoop.yarn.service.conf.RestApiConstants;
+import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.util.RMHAUtils;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
 import org.eclipse.jetty.util.UrlEncoded;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,7 +137,7 @@ public class ApiServiceClient extends AppAdminClient {
    * @return URI to API Service
    * @throws IOException
    */
-  private String getApiUrl(String appName) throws IOException {
+  private String getServicePath(String appName) throws IOException {
     String url = getRMWebAddress();
     StringBuilder api = new StringBuilder();
     api.append(url);
@@ -148,23 +154,40 @@ public class ApiServiceClient extends AppAdminClient {
     return api.toString();
   }
 
+  private String getInstancesPath(String appName) throws IOException {
+    Preconditions.checkNotNull(appName);
+    String url = getRMWebAddress();
+    StringBuilder api = new StringBuilder();
+    api.append(url);
+    api.append("/app/v1/services/").append(appName).append("/")
+        .append(RestApiConstants.COMP_INSTANCES);
+    Configuration conf = getConfig();
+    if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
+        "simple")) {
+      api.append("?user.name=" + UrlEncoded
+          .encodeString(System.getProperty("user.name")));
+    }
+    return api.toString();
+  }
+
   private Builder getApiClient() throws IOException {
-    return getApiClient(null);
+    return getApiClient(getServicePath(null));
   }
 
   /**
    * Setup API service web request.
    *
-   * @param appName
+   * @param requestPath
    * @return
    * @throws IOException
    */
-  private Builder getApiClient(String appName) throws IOException {
+  private Builder getApiClient(String requestPath)
+      throws IOException {
     Client client = Client.create(getClientConfig());
     Configuration conf = getConfig();
     client.setChunkedEncodingSize(null);
     Builder builder = client
-        .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON);
+        .resource(requestPath).type(MediaType.APPLICATION_JSON);
     if (conf.get("hadoop.http.authentication.type").equals("kerberos")) {
       AuthenticatedURL.Token token = new AuthenticatedURL.Token();
       builder.header("WWW-Authenticate", token);
@@ -312,7 +335,7 @@ public class ApiServiceClient extends AppAdminClient {
       service.setName(appName);
       service.setState(ServiceState.STOPPED);
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
@@ -335,7 +358,7 @@ public class ApiServiceClient extends AppAdminClient {
       service.setName(appName);
       service.setState(ServiceState.STARTED);
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
@@ -381,7 +404,7 @@ public class ApiServiceClient extends AppAdminClient {
   public int actionDestroy(String appName) throws IOException, YarnException {
     int result = EXIT_SUCCESS;
     try {
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .delete(ClientResponse.class);
       result = processResponse(response);
     } catch (Exception e) {
@@ -413,7 +436,7 @@ public class ApiServiceClient extends AppAdminClient {
         service.addComponent(component);
       }
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient(appName)
+      ClientResponse response = getApiClient(getServicePath(appName))
           .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
@@ -454,7 +477,8 @@ public class ApiServiceClient extends AppAdminClient {
       ServiceApiUtil.validateNameFormat(appName, getConfig());
     }
     try {
-      ClientResponse response = getApiClient(appName).get(ClientResponse.class);
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .get(ClientResponse.class);
       if (response.getStatus() != 200) {
         StringBuilder sb = new StringBuilder();
         sb.append(appName);
@@ -470,16 +494,20 @@ public class ApiServiceClient extends AppAdminClient {
   }
 
   @Override
-  public int actionUpgrade(String appName,
-      String fileName) throws IOException, YarnException {
+  public int initiateUpgrade(String appName,
+      String fileName, boolean autoFinalize) throws IOException, YarnException {
     int result;
     try {
       Service service =
           loadAppJsonFromLocalFS(fileName, appName, null, null);
-      service.setState(ServiceState.UPGRADING);
+      if (autoFinalize) {
+        service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+      } else {
+        service.setState(ServiceState.UPGRADING);
+      }
       String buffer = jsonSerDeser.toJson(service);
-      ClientResponse response = getApiClient()
-          .post(ClientResponse.class, buffer);
+      ClientResponse response = getApiClient(getServicePath(appName))
+          .put(ClientResponse.class, buffer);
       result = processResponse(response);
     } catch (Exception e) {
       LOG.error("Failed to upgrade application: ", e);
@@ -487,4 +515,32 @@ public class ApiServiceClient extends AppAdminClient {
     }
     return result;
   }
+
+  @Override
+  public int actionUpgradeInstances(String appName, List<String> compInstances)
+      throws IOException, YarnException {
+    int result;
+    Container[] toUpgrade = new Container[compInstances.size()];
+    try {
+      int idx = 0;
+      for (String instanceName : compInstances) {
+        Container container = new Container();
+        container.setComponentInstanceName(instanceName);
+        container.setState(ContainerState.UPGRADING);
+        toUpgrade[idx++] = container;
+      }
+      String buffer = containerJsonSerde.toJson(toUpgrade);
+      ClientResponse response = getApiClient(getInstancesPath(appName))
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to upgrade component instance: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  private static JsonSerDeser<Container[]> containerJsonSerde =
+      new JsonSerDeser<>(Container[].class,
+      PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
 }

+ 169 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java

@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.yarn.service.webapp;
 
+import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -28,10 +29,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.conf.RestApiConstants;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,9 +58,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
 import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
@@ -177,17 +185,7 @@ public class ApiServer {
       }
       UserGroupInformation ugi = getProxyUser(request);
       LOG.info("GET: getService for appName = {} user = {}", appName, ugi);
-      Service app = ugi.doAs(new PrivilegedExceptionAction<Service>() {
-        @Override
-        public Service run() throws IOException, YarnException {
-          ServiceClient sc = getServiceClient();
-          sc.init(YARN_CONFIG);
-          sc.start();
-          Service app = sc.getStatus(appName);
-          sc.close();
-          return app;
-        }
-      });
+      Service app = getServiceFromClient(ugi, appName);
       return Response.ok(app).build();
     } catch (AccessControlException e) {
       return formatResponse(Status.FORBIDDEN, e.getMessage());
@@ -393,17 +391,19 @@ public class ApiServer {
         return startService(appName, ugi);
       }
 
+      // If an UPGRADE is requested
+      if (updateServiceData.getState() != null && (
+          updateServiceData.getState() == ServiceState.UPGRADING ||
+              updateServiceData.getState() ==
+                  ServiceState.UPGRADING_AUTO_FINALIZE)) {
+        return upgradeService(updateServiceData, ugi);
+      }
+
       // If new lifetime value specified then update it
       if (updateServiceData.getLifetime() != null
           && updateServiceData.getLifetime() > 0) {
         return updateLifetime(appName, updateServiceData, ugi);
       }
-
-      // If an UPGRADE is requested
-      if (updateServiceData.getState() != null &&
-          updateServiceData.getState() == ServiceState.UPGRADING) {
-        return upgradeService(updateServiceData, ugi);
-      }
     } catch (UndeclaredThrowableException e) {
       return formatResponse(Status.BAD_REQUEST,
           e.getCause().getMessage());
@@ -427,6 +427,103 @@ public class ApiServer {
     return Response.status(Status.NO_CONTENT).build();
   }
 
+  @PUT
+  @Path(COMP_INSTANCE_LONG_PATH)
+  @Consumes({MediaType.APPLICATION_JSON})
+  @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
+  public Response updateComponentInstance(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String serviceName,
+      @PathParam(COMPONENT_NAME) String componentName,
+      @PathParam(COMP_INSTANCE_NAME) String compInstanceName,
+      Container reqContainer) {
+
+    try {
+      UserGroupInformation ugi = getProxyUser(request);
+      LOG.info("PUT: update component instance {} for component = {}" +
+              " service = {} user = {}", compInstanceName, componentName,
+          serviceName, ugi);
+      if (reqContainer == null) {
+        throw new YarnException("No container data provided.");
+      }
+      Service service = getServiceFromClient(ugi, serviceName);
+      Component component = service.getComponent(componentName);
+      if (component == null) {
+        throw new YarnException(String.format(
+            "The component name in the URI path (%s) is invalid.",
+            componentName));
+      }
+
+      Container liveContainer = component.getComponentInstance(
+          compInstanceName);
+      if (liveContainer == null) {
+        throw new YarnException(String.format(
+            "The component (%s) does not have a component instance (%s).",
+            componentName, compInstanceName));
+      }
+
+      if (reqContainer.getState() != null
+          && reqContainer.getState().equals(ContainerState.UPGRADING)) {
+        return processContainerUpgrade(ugi, service,
+            Lists.newArrayList(liveContainer));
+      }
+    } catch (AccessControlException e) {
+      return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
+    } catch (YarnException e) {
+      return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
+    }
+    return Response.status(Status.NO_CONTENT).build();
+  }
+
+  @PUT
+  @Path(COMP_INSTANCES_PATH)
+  @Consumes({MediaType.APPLICATION_JSON})
+  @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
+  public Response updateComponentInstances(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String serviceName,
+      List<Container> requestContainers) {
+
+    try {
+      if (requestContainers == null || requestContainers.isEmpty()) {
+        throw new YarnException("No containers provided.");
+      }
+      UserGroupInformation ugi = getProxyUser(request);
+      List<String> toUpgrade = new ArrayList<>();
+      for (Container reqContainer : requestContainers) {
+        if (reqContainer.getState() != null &&
+            reqContainer.getState().equals(ContainerState.UPGRADING)) {
+          toUpgrade.add(reqContainer.getComponentInstanceName());
+        }
+      }
+
+      if (!toUpgrade.isEmpty()) {
+        Service service = getServiceFromClient(ugi, serviceName);
+        LOG.info("PUT: upgrade component instances {} for service = {} " +
+            "user = {}", toUpgrade, serviceName, ugi);
+        List<Container> liveContainers = ServiceApiUtil
+            .getLiveContainers(service, toUpgrade);
+
+        return processContainerUpgrade(ugi, service, liveContainers);
+      }
+    } catch (AccessControlException e) {
+      return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
+    } catch (YarnException e) {
+      return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
+    }
+    return Response.status(Status.NO_CONTENT).build();
+  }
+
   private Response flexService(Service service, UserGroupInformation ugi)
       throws IOException, InterruptedException {
     String appName = service.getName();
@@ -511,17 +608,70 @@ public class ApiServer {
       ServiceClient sc = getServiceClient();
       sc.init(YARN_CONFIG);
       sc.start();
-      sc.actionUpgrade(service);
+      sc.initiateUpgrade(service);
       sc.close();
       return null;
     });
-    LOG.info("Service {} version {} upgrade initialized");
+    LOG.info("Service {} version {} upgrade initialized", service.getName(),
+        service.getVersion());
     status.setDiagnostics("Service " + service.getName() +
         " version " + service.getVersion() + " saved.");
     status.setState(ServiceState.ACCEPTED);
     return formatResponse(Status.ACCEPTED, status);
   }
 
+  private Response processContainerUpgrade(UserGroupInformation ugi,
+      Service service, List<Container> containers) throws YarnException,
+      IOException, InterruptedException {
+
+    if (service.getState() != ServiceState.UPGRADING) {
+      throw new YarnException(
+          String.format("The upgrade of service %s has not been initiated.",
+              service.getName()));
+    }
+    for (Container liveContainer : containers) {
+      if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) {
+        // Nothing to upgrade
+        throw new YarnException(String.format(
+            "The component instance (%s) does not need an upgrade.",
+            liveContainer.getComponentInstanceName()));
+      }
+    }
+
+    Integer result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
+      int result1;
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      result1 = sc.actionUpgrade(service, containers);
+      sc.close();
+      return result1;
+    });
+
+    if (result == EXIT_SUCCESS) {
+      ServiceStatus status = new ServiceStatus();
+      status.setDiagnostics(
+          "Upgrading component instances " + containers.stream()
+              .map(Container::getId).collect(Collectors.joining(",")) + ".");
+      return formatResponse(Response.Status.ACCEPTED, status);
+    }
+    // If result is not a success, consider it a no-op
+    return Response.status(Response.Status.NO_CONTENT).build();
+  }
+
+  private Service getServiceFromClient(UserGroupInformation ugi,
+      String serviceName) throws IOException, InterruptedException {
+
+    return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      Service app1 = sc.getStatus(serviceName);
+      sc.close();
+      return app1;
+    });
+  }
+
   /**
    * Used by negative test case.
    *

+ 98 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java

@@ -17,17 +17,24 @@
 
 package org.apache.hadoop.yarn.service;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * A mock version of ServiceClient - This class is design
  * to simulate various error conditions that will happen
@@ -36,14 +43,31 @@ import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 public class ServiceClientTest extends ServiceClient {
 
   private Configuration conf = new Configuration();
-
-  protected static void init() {
-  }
+  private Service goodServiceStatus = buildLiveGoodService();
+  private boolean initialized;
 
   public ServiceClientTest() {
     super();
   }
 
+  @Override
+  public void init(Configuration conf) {
+    if (!initialized) {
+      super.init(conf);
+      initialized = true;
+    }
+  }
+
+  @Override
+  public void stop() {
+    // This is needed for testing  API Server which use client to get status
+    // and then perform an action.
+  }
+
+  public void forceStop() {
+    super.stop();
+  }
+
   @Override
   public Configuration getConfig() {
     return conf;
@@ -58,11 +82,8 @@ public class ServiceClientTest extends ServiceClient {
 
   @Override
   public Service getStatus(String appName) {
-    if (appName == null) {
-      throw new NullPointerException();
-    }
-    if (appName.equals("jenkins")) {
-      return new Service();
+    if (appName != null && appName.equals("jenkins")) {
+      return goodServiceStatus;
     } else {
       throw new IllegalArgumentException();
     }
@@ -71,10 +92,7 @@ public class ServiceClientTest extends ServiceClient {
   @Override
   public int actionStart(String serviceName)
       throws YarnException, IOException {
-    if (serviceName == null) {
-      throw new NullPointerException();
-    }
-    if (serviceName.equals("jenkins")) {
+    if (serviceName != null && serviceName.equals("jenkins")) {
       return EXIT_SUCCESS;
     } else {
       throw new ApplicationNotFoundException("");
@@ -98,19 +116,77 @@ public class ServiceClientTest extends ServiceClient {
 
   @Override
   public int actionDestroy(String serviceName) {
-    if (serviceName == null) {
-      throw new NullPointerException();
+    if (serviceName != null) {
+      if (serviceName.equals("jenkins")) {
+        return EXIT_SUCCESS;
+      } else if (serviceName.equals("jenkins-already-stopped")) {
+        return EXIT_SUCCESS;
+      } else if (serviceName.equals("jenkins-doesn't-exist")) {
+        return EXIT_NOT_FOUND;
+      } else if (serviceName.equals("jenkins-error-cleaning-registry")) {
+        return EXIT_OTHER_FAILURE;
+      }
     }
-    if (serviceName.equals("jenkins")) {
+    throw new IllegalArgumentException();
+  }
+
+  @Override
+  public int initiateUpgrade(Service service) throws YarnException,
+      IOException {
+    if (service.getName() != null && service.getName().equals("jenkins")) {
       return EXIT_SUCCESS;
-    } else if (serviceName.equals("jenkins-already-stopped")) {
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  @Override
+  public int actionUpgrade(Service service, List<Container> compInstances)
+      throws IOException, YarnException {
+    if (service.getName() != null && service.getName().equals("jenkins")) {
       return EXIT_SUCCESS;
-    } else if (serviceName.equals("jenkins-doesn't-exist")) {
-      return EXIT_NOT_FOUND;
-    } else if (serviceName.equals("jenkins-error-cleaning-registry")) {
-      return EXIT_OTHER_FAILURE;
     } else {
       throw new IllegalArgumentException();
     }
   }
+
+  Service getGoodServiceStatus() {
+    return goodServiceStatus;
+  }
+
+  static Service buildGoodService() {
+    Service service = new Service();
+    service.setName("jenkins");
+    service.setVersion("v1");
+    Artifact artifact = new Artifact();
+    artifact.setType(Artifact.TypeEnum.DOCKER);
+    artifact.setId("jenkins:latest");
+    Resource resource = new Resource();
+    resource.setCpus(1);
+    resource.setMemory("2048");
+    List<Component> components = new ArrayList<>();
+    Component c = new Component();
+    c.setName("jenkins");
+    c.setNumberOfContainers(2L);
+    c.setArtifact(artifact);
+    c.setLaunchCommand("");
+    c.setResource(resource);
+    components.add(c);
+    service.setComponents(components);
+    return service;
+  }
+
+  static Service buildLiveGoodService() {
+    Service service = buildGoodService();
+    Component comp = service.getComponents().iterator().next();
+    List<Container> containers = new ArrayList<>();
+    for (int i = 0; i < comp.getNumberOfContainers(); i++) {
+      Container container = new Container();
+      container.setComponentInstanceName(comp.getName() + "-" + (i + 1));
+      container.setState(ContainerState.READY);
+      containers.add(container);
+    }
+    comp.setContainers(containers);
+    return service;
+  }
 }

+ 67 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java

@@ -35,12 +35,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
 import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.Resource;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.service.webapp.ApiServer;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -52,13 +54,14 @@ import org.mockito.Mockito;
 public class TestApiServer {
   private ApiServer apiServer;
   private HttpServletRequest request;
+  private ServiceClientTest mockServerClient;
 
   @Before
   public void setup() throws Exception {
     request = Mockito.mock(HttpServletRequest.class);
     Mockito.when(request.getRemoteUser())
         .thenReturn(System.getProperty("user.name"));
-    ServiceClient mockServerClient = new ServiceClientTest();
+    mockServerClient = new ServiceClientTest();
     Configuration conf = new Configuration();
     conf.set("yarn.api-service.service.client.class",
         ServiceClientTest.class.getName());
@@ -66,6 +69,11 @@ public class TestApiServer {
     apiServer.setServiceClient(mockServerClient);
   }
 
+  @After
+  public void teardown() {
+    mockServerClient.forceStop();
+  }
+
   @Test
   public void testPathAnnotation() {
     assertNotNull(this.apiServer.getClass().getAnnotation(Path.class));
@@ -107,24 +115,7 @@ public class TestApiServer {
     BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig));
     bw.write(json);
     bw.close();
-    Service service = new Service();
-    service.setName("jenkins");
-    service.setVersion("v1");
-    Artifact artifact = new Artifact();
-    artifact.setType(TypeEnum.DOCKER);
-    artifact.setId("jenkins:latest");
-    Resource resource = new Resource();
-    resource.setCpus(1);
-    resource.setMemory("2048");
-    List<Component> components = new ArrayList<Component>();
-    Component c = new Component();
-    c.setName("jenkins");
-    c.setNumberOfContainers(1L);
-    c.setArtifact(artifact);
-    c.setLaunchCommand("");
-    c.setResource(resource);
-    components.add(c);
-    service.setComponents(components);
+    Service service = ServiceClientTest.buildGoodService();
     final Response actual = apiServer.createService(request, service);
     assertEquals("Create service is ",
         Response.status(Status.ACCEPTED).build().getStatus(),
@@ -495,4 +486,60 @@ public class TestApiServer {
             + "that in the URI path (jenkins-master)",
         serviceStatus.getDiagnostics());
   }
+
+  @Test
+  public void testInitiateUpgrade() {
+    Service goodService = ServiceClientTest.buildLiveGoodService();
+    goodService.setVersion("v2");
+    goodService.setState(ServiceState.UPGRADING);
+    final Response actual = apiServer.updateService(request,
+        goodService.getName(), goodService);
+    assertEquals("Initiate upgrade is ",
+        Response.status(Status.ACCEPTED).build().getStatus(),
+        actual.getStatus());
+  }
+
+  @Test
+  public void testUpgradeSingleInstance() {
+    Service goodService = ServiceClientTest.buildLiveGoodService();
+    Component comp = goodService.getComponents().iterator().next();
+    Container container = comp.getContainers().iterator().next();
+    container.setState(ContainerState.UPGRADING);
+
+    // To be able to upgrade, the service needs to be in UPGRADING
+    // and container state needs to be in NEEDS_UPGRADE.
+    Service serviceStatus = mockServerClient.getGoodServiceStatus();
+    serviceStatus.setState(ServiceState.UPGRADING);
+    serviceStatus.getComponents().iterator().next().getContainers().iterator()
+        .next().setState(ContainerState.NEEDS_UPGRADE);
+
+    final Response actual = apiServer.updateComponentInstance(request,
+        goodService.getName(), comp.getName(),
+        container.getComponentInstanceName(), container);
+    assertEquals("Instance upgrade is ",
+        Response.status(Status.ACCEPTED).build().getStatus(),
+        actual.getStatus());
+  }
+
+  @Test
+  public void testUpgradeMultipleInstances() {
+    Service goodService = ServiceClientTest.buildLiveGoodService();
+    Component comp = goodService.getComponents().iterator().next();
+    comp.getContainers().forEach(container ->
+        container.setState(ContainerState.UPGRADING));
+
+    // To be able to upgrade, the service needs to be in UPGRADING
+    // and container state needs to be in NEEDS_UPGRADE.
+    Service serviceStatus = mockServerClient.getGoodServiceStatus();
+    serviceStatus.setState(ServiceState.UPGRADING);
+    serviceStatus.getComponents().iterator().next().getContainers().forEach(
+        container -> container.setState(ContainerState.NEEDS_UPGRADE)
+    );
+
+    final Response actual = apiServer.updateComponentInstances(request,
+        goodService.getName(), comp.getContainers());
+    assertEquals("Instance upgrade is ",
+        Response.status(Status.ACCEPTED).build().getStatus(),
+        actual.getStatus());
+  }
 }

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java

@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.eclipse.jetty.server.Server;
@@ -256,4 +257,29 @@ public class TestApiServiceClient {
     }
   }
 
+  @Test
+  public void testInitiateServiceUpgrade() {
+    String appName = "example-app";
+    String upgradeFileName = "target/test-classes/example-app.json";
+    try {
+      int result = asc.initiateUpgrade(appName, upgradeFileName, false);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testInstancesUpgrade() {
+    String appName = "example-app";
+    try {
+      int result = asc.actionUpgradeInstances(appName, Lists.newArrayList(
+          "comp-1", "comp-2"));
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
@@ -49,4 +51,8 @@ public interface ClientAMProtocol {
 
   RestartServiceResponseProto restart(RestartServiceRequestProto request)
       throws IOException, YarnException;
+
+  CompInstancesUpgradeResponseProto upgrade(
+      CompInstancesUpgradeRequestProto request) throws IOException,
+      YarnException;
 }

+ 32 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java

@@ -26,8 +26,11 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
@@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,12 +156,16 @@ public class ClientAMService extends AbstractService
   @Override
   public UpgradeServiceResponseProto upgrade(
       UpgradeServiceRequestProto request) throws IOException {
-    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE);
-    event.setVersion(request.getVersion());
-    context.scheduler.getDispatcher().getEventHandler().handle(event);
-    LOG.info("Upgrading service to version {} by {}", request.getVersion(),
-        UserGroupInformation.getCurrentUser());
-    return UpgradeServiceResponseProto.newBuilder().build();
+    try {
+      context.getServiceManager().processUpgradeRequest(request.getVersion(),
+          request.getAutoFinalize());
+      LOG.info("Upgrading service to version {} by {}", request.getVersion(),
+          UserGroupInformation.getCurrentUser());
+      return UpgradeServiceResponseProto.newBuilder().build();
+    } catch (Exception ex) {
+      return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())
+          .build();
+    }
   }
 
   @Override
@@ -167,4 +176,21 @@ public class ClientAMService extends AbstractService
     LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser());
     return RestartServiceResponseProto.newBuilder().build();
   }
+
+  @Override
+  public CompInstancesUpgradeResponseProto upgrade(
+      CompInstancesUpgradeRequestProto request)
+      throws IOException, YarnException {
+    if (!request.getContainerIdsList().isEmpty()) {
+
+      for (String containerId : request.getContainerIdsList()) {
+        ComponentInstanceEvent event =
+            new ComponentInstanceEvent(ContainerId.fromString(containerId),
+                ComponentInstanceEventType.UPGRADE);
+        LOG.info("Upgrade container {}", containerId);
+        context.scheduler.getDispatcher().getEventHandler().handle(event);
+      }
+    }
+    return CompInstancesUpgradeResponseProto.newBuilder().build();
+  }
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.LoadingCache;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
@@ -42,8 +43,17 @@ public class ServiceContext {
   public String principal;
   // AM keytab location
   public String keytab;
+  private ServiceManager serviceManager;
 
   public ServiceContext() {
 
   }
+
+  public ServiceManager getServiceManager() {
+    return serviceManager;
+  }
+
+  void setServiceManager(ServiceManager serviceManager) {
+    this.serviceManager = Preconditions.checkNotNull(serviceManager);
+  }
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java

@@ -28,6 +28,7 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
 
   private final ServiceEventType type;
   private String version;
+  private boolean autoFinalize;
 
   public ServiceEvent(ServiceEventType serviceEventType) {
     super(serviceEventType);
@@ -46,4 +47,13 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
     this.version = version;
     return this;
   }
+
+  public boolean isAutoFinalize() {
+    return autoFinalize;
+  }
+
+  public ServiceEvent setAutoFinalize(boolean autoFinalize) {
+    this.autoFinalize = autoFinalize;
+    return this;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java

@@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.service;
 public enum ServiceEventType {
   START,
   UPGRADE,
-  STOP_UPGRADE
+  CHECK_STABLE
 }

+ 118 - 38
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java

@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
@@ -39,10 +40,13 @@ import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
+
 /**
- * Manages the state of the service.
+ * Manages the state of Service.
  */
 public class ServiceManager implements EventHandler<ServiceEvent> {
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -56,10 +60,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
 
   private final StateMachine<State, ServiceEventType, ServiceEvent>
       stateMachine;
+  private final UpgradeComponentsFinder componentsFinder;
 
   private final AsyncDispatcher dispatcher;
   private final SliderFileSystem fs;
-  private final UpgradeComponentsFinder componentsFinder;
 
   private String upgradeVersion;
 
@@ -72,9 +76,16 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
               State.UPGRADING), ServiceEventType.UPGRADE,
               new StartUpgradeTransition())
 
+          .addTransition(State.STABLE, EnumSet.of(State.STABLE),
+              ServiceEventType.CHECK_STABLE, new CheckStableTransition())
+
           .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
               State.UPGRADING), ServiceEventType.START,
-              new StopUpgradeTransition())
+              new CheckStableTransition())
+
+          .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
+              State.UPGRADING), ServiceEventType.CHECK_STABLE,
+              new CheckStableTransition())
           .installTopology();
 
   public ServiceManager(ServiceContext context) {
@@ -102,7 +113,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitionException e) {
         LOG.error(MessageFormat.format(
-            "[SERVICE]: Invalid event {0} at {1}.", event.getType(),
+            "[SERVICE]: Invalid event {1} at {2}.", event.getType(),
             oldState), e);
       }
       if (oldState != getState()) {
@@ -130,22 +141,11 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
       try {
-        Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
-            serviceManager.fs, serviceManager.getName(), event.getVersion());
-
-        serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
-        List<org.apache.hadoop.yarn.service.api.records.Component>
-            compsThatNeedUpgrade = serviceManager.componentsFinder.
-            findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec);
-
-        if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
-          compsThatNeedUpgrade.forEach(component -> {
-            ComponentEvent needUpgradeEvent = new ComponentEvent(
-                component.getName(), ComponentEventType.UPGRADE).
-                setTargetSpec(component);
-            serviceManager.dispatcher.getEventHandler().handle(
-                needUpgradeEvent);
-          });
+        if (!event.isAutoFinalize()) {
+          serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
+        } else {
+          serviceManager.serviceSpec.setState(
+              ServiceState.UPGRADING_AUTO_FINALIZE);
         }
         serviceManager.upgradeVersion = event.getVersion();
         return State.UPGRADING;
@@ -157,22 +157,29 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     }
   }
 
-  private static class StopUpgradeTransition implements
+  private static class CheckStableTransition implements
       MultipleArcTransition<ServiceManager, ServiceEvent, State> {
 
     @Override
     public State transition(ServiceManager serviceManager,
         ServiceEvent event) {
-      //abort is not supported currently
-      //trigger re-check of service state
-      ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler,
-          true);
-      if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) {
-        return serviceManager.finalizeUpgrade() ? State.STABLE :
-            State.UPGRADING;
-      } else {
-        return State.UPGRADING;
+      //trigger check of service state
+      ServiceState currState = serviceManager.serviceSpec.getState();
+      if (currState.equals(ServiceState.STABLE)) {
+        return State.STABLE;
       }
+      if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
+          event.getType().equals(ServiceEventType.START)) {
+        ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
+        if (targetState.equals(ServiceState.STABLE)) {
+          if (serviceManager.finalizeUpgrade()) {
+            LOG.info("Service def state changed from {} -> {}", currState,
+                serviceManager.serviceSpec.getState());
+            return State.STABLE;
+          }
+        }
+      }
+      return State.UPGRADING;
     }
   }
 
@@ -181,12 +188,21 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
    */
   private boolean finalizeUpgrade() {
     try {
-      Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade(
+      // save the application id and state to
+      Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
           fs, getName(), upgradeVersion);
-      ServiceApiUtil.writeAppDefinition(fs,
-          ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec);
+      targetSpec.setId(serviceSpec.getId());
+      targetSpec.setState(ServiceState.STABLE);
+      Map<String, Component> allComps = scheduler.getAllComponents();
+      targetSpec.getComponents().forEach(compSpec -> {
+        Component comp = allComps.get(compSpec.getName());
+        compSpec.setState(comp.getComponentSpec().getState());
+      });
+      jsonSerDeser.save(fs.getFileSystem(),
+          ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
+      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
     } catch (IOException e) {
-      LOG.error("Upgrade did not complete because unable to overwrite the" +
+      LOG.error("Upgrade did not complete because unable to re-write the" +
           " service definition", e);
       return false;
     }
@@ -195,13 +211,79 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
       fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
     } catch (IOException e) {
       LOG.warn("Unable to delete upgrade definition for service {} " +
-              "version {}", getName(), upgradeVersion);
+          "version {}", getName(), upgradeVersion);
     }
+    serviceSpec.setState(ServiceState.STABLE);
     serviceSpec.setVersion(upgradeVersion);
     upgradeVersion = null;
     return true;
   }
 
+  private static ServiceState checkIfStable(Service service) {
+    // if desired == running
+    for (org.apache.hadoop.yarn.service.api.records.Component comp :
+        service.getComponents()) {
+      if (!comp.getState().equals(
+          org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) {
+        return service.getState();
+      }
+    }
+    return ServiceState.STABLE;
+  }
+
+  /**
+   * Service state gets directly modified by ServiceMaster and Component.
+   * This is a problem for upgrade and flexing. For now, invoking
+   * ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
+   * this in future.
+   */
+  public void checkAndUpdateServiceState(boolean isIncrement) {
+    writeLock.lock();
+    try {
+      if (!getState().equals(State.UPGRADING)) {
+        ServiceMaster.checkAndUpdateServiceState(this.scheduler,
+            isIncrement);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  void processUpgradeRequest(String upgradeVersion,
+      boolean autoFinalize) throws IOException {
+    Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
+        context.fs, context.service.getName(), upgradeVersion);
+
+    List<org.apache.hadoop.yarn.service.api.records.Component>
+        compsThatNeedUpgrade = componentsFinder.
+        findTargetComponentSpecs(context.service, targetSpec);
+    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
+        .setVersion(upgradeVersion)
+        .setAutoFinalize(autoFinalize);
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+
+    if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
+      if (autoFinalize) {
+        event.setAutoFinalize(true);
+      }
+      compsThatNeedUpgrade.forEach(component -> {
+        ComponentEvent needUpgradeEvent = new ComponentEvent(
+            component.getName(), ComponentEventType.UPGRADE)
+            .setTargetSpec(component)
+            .setUpgradeVersion(event.getVersion());
+        context.scheduler.getDispatcher().getEventHandler().handle(
+            needUpgradeEvent);
+      });
+    } else {
+      // nothing to upgrade if upgrade auto finalize is requested, trigger a
+      // state check.
+      if (autoFinalize) {
+        context.scheduler.getDispatcher().getEventHandler().handle(
+            new ServiceEvent(ServiceEventType.CHECK_STABLE));
+      }
+    }
+  }
+
   /**
    * Returns the name of the service.
    */
@@ -216,10 +298,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
     STABLE, UPGRADING
   }
 
-
   @VisibleForTesting
   Service getServiceSpec() {
     return serviceSpec;
   }
-
 }

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java

@@ -329,6 +329,7 @@ public class ServiceScheduler extends CompositeService {
     // Since AM has been started and registered, the service is in STARTED state
     app.setState(ServiceState.STARTED);
     serviceManager = new ServiceManager(context);
+    context.setServiceManager(serviceManager);
 
     // recover components based on containers sent from RM
     recoverComponents(response);
@@ -757,6 +758,32 @@ public class ServiceScheduler extends CompositeService {
       // automatically which will trigger stopping COMPONENT INSTANCE
     }
 
+    @Override
+    public void onContainerReInitialize(ContainerId containerId) {
+      ComponentInstance instance = liveInstances.get(containerId);
+      if (instance == null) {
+        LOG.error("No component instance exists for {}", containerId);
+        return;
+      }
+      ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
+          containerId, ComponentInstanceEventType.BECOME_READY);
+      dispatcher.getEventHandler().handle(becomeReadyEvent);
+    }
+
+    @Override
+    public void onContainerReInitializeError(ContainerId containerId,
+        Throwable t) {
+      ComponentInstance instance = liveInstances.get(containerId);
+      if (instance == null) {
+        LOG.error("No component instance exists for {}", containerId);
+        return;
+      }
+      ComponentEvent event = new ComponentEvent(instance.getCompName(),
+          ComponentEventType.CONTAINER_COMPLETED)
+          .setInstance(instance).setContainerId(containerId);
+      dispatcher.getEventHandler().handle(event);
+    }
+
     @Override public void onContainerResourceIncreased(ContainerId containerId,
         Resource resource) {
 

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java

@@ -250,6 +250,15 @@ public class Component implements Serializable {
     return null;
   }
 
+  public Container getComponentInstance(String compInstanceName) {
+    for (Container container : containers) {
+      if (compInstanceName.equals(container.getComponentInstanceName())) {
+        return container;
+      }
+    }
+    return null;
+  }
+
   /**
    * Run all containers of this component in privileged mode (YARN-4262).
    **/
@@ -441,4 +450,16 @@ public class Component implements Serializable {
       this.setReadinessCheck(that.getReadinessCheck());
     }
   }
+
+  public void overwrite(Component that) {
+    setArtifact(that.getArtifact());
+    setResource(that.resource);
+    setNumberOfContainers(that.getNumberOfContainers());
+    setLaunchCommand(that.getLaunchCommand());
+    setConfiguration(that.configuration);
+    setRunPrivilegedContainer(that.getRunPrivilegedContainer());
+    setDependencies(that.getDependencies());
+    setPlacementPolicy(that.getPlacementPolicy());
+    setReadinessCheck(that.getReadinessCheck());
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java

@@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public enum ContainerState {
-  RUNNING_BUT_UNREADY, READY, STOPPED
+  RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING;
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java

@@ -29,5 +29,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 @ApiModel(description = "The current state of an service.")
 @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
 public enum ServiceState {
-  ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING;
+  ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
+  UPGRADING_AUTO_FINALIZE;
 }

+ 82 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@@ -59,8 +60,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.ClientAMProtocol;
 import org.apache.hadoop.yarn.service.ServiceMaster;
+import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
@@ -206,15 +209,21 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
   }
 
   @Override
-  public int actionUpgrade(String appName, String fileName)
+  public int initiateUpgrade(String appName, String fileName,
+      boolean autoFinalize)
       throws IOException, YarnException {
-    checkAppExistOnHdfs(appName);
     Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
         null, null);
-    return actionUpgrade(upgradeService);
+    if (autoFinalize) {
+      upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
+    } else {
+      upgradeService.setState(ServiceState.UPGRADING);
+    }
+    return initiateUpgrade(upgradeService);
   }
 
-  public int actionUpgrade(Service service) throws YarnException, IOException {
+  public int initiateUpgrade(Service service) throws YarnException,
+      IOException {
     Service persistedService =
         ServiceApiUtil.loadService(fs, service.getName());
     if (!StringUtils.isEmpty(persistedService.getId())) {
@@ -231,6 +240,15 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       throw new YarnException(message);
     }
 
+    Service liveService = getStatus(service.getName());
+    if (!liveService.getState().equals(ServiceState.STABLE)) {
+      String message = service.getName() + " is at " +
+          liveService.getState()
+          + " state, upgrade can not be invoked when service is STABLE.";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+
     Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
     ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
     ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
@@ -245,8 +263,56 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     UpgradeServiceRequestProto.Builder requestBuilder =
         UpgradeServiceRequestProto.newBuilder();
     requestBuilder.setVersion(service.getVersion());
+    if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+      requestBuilder.setAutoFinalize(true);
+    }
+    UpgradeServiceResponseProto responseProto = proxy.upgrade(
+        requestBuilder.build());
+    if (responseProto.hasError()) {
+      LOG.error("Service {} upgrade to version {} failed because {}",
+          service.getName(), service.getVersion(), responseProto.getError());
+      throw new YarnException("Failed to upgrade service " + service.getName()
+          + " to version " + service.getVersion() + " because " +
+          responseProto.getError());
+    }
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionUpgradeInstances(String appName,
+      List<String> componentInstances) throws IOException, YarnException {
+    checkAppExistOnHdfs(appName);
+    Service persistedService = ServiceApiUtil.loadService(fs, appName);
+    List<Container> containersToUpgrade = ServiceApiUtil.
+        getLiveContainers(persistedService, componentInstances);
+    return actionUpgrade(persistedService, containersToUpgrade);
+  }
 
-    proxy.upgrade(requestBuilder.build());
+  public int actionUpgrade(Service service, List<Container> compInstances)
+      throws IOException, YarnException {
+    ApplicationReport appReport =
+        yarnClient.getApplicationReport(getAppId(service.getName()));
+
+    if (appReport.getYarnApplicationState() != RUNNING) {
+      String message = service.getName() + " is at " +
+          appReport.getYarnApplicationState()
+          + " state, upgrade can only be invoked when service is running.";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+    if (StringUtils.isEmpty(appReport.getHost())) {
+      throw new YarnException(service.getName() + " AM hostname is empty.");
+    }
+    ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
+
+    List<String> containerIdsToUpgrade = new ArrayList<>();
+    compInstances.forEach(compInst ->
+        containerIdsToUpgrade.add(compInst.getId()));
+    LOG.info("instances to upgrade {}", containerIdsToUpgrade);
+    CompInstancesUpgradeRequestProto.Builder upgradeRequestBuilder =
+        CompInstancesUpgradeRequestProto.newBuilder();
+    upgradeRequestBuilder.addAllContainerIds(containerIdsToUpgrade);
+    proxy.upgrade(upgradeRequestBuilder.build());
     return EXIT_SUCCESS;
   }
 
@@ -391,6 +457,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       LOG.error(message);
       throw new YarnException(message);
     }
+
+    Service liveService = getStatus(serviceName);
+    if (liveService.getState().equals(ServiceState.UPGRADING) ||
+        liveService.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
+      String message = serviceName + " is at " +
+          liveService.getState()
+          + " state, flex can not be invoked when service is upgrading. ";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+
     if (StringUtils.isEmpty(appReport.getHost())) {
       throw new YarnException(serviceName + " AM hostname is empty");
     }

+ 128 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java

@@ -34,20 +34,23 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.ServiceEvent;
+import org.apache.hadoop.yarn.service.ServiceEventType;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.ContainerFailureTracker;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.ServiceMaster;
 import org.apache.hadoop.yarn.service.ServiceMetrics;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
-import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
-import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
 import org.apache.hadoop.yarn.service.monitor.probe.Probe;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.provider.ProviderUtils;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -70,6 +73,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -109,6 +113,10 @@ public class Component implements EventHandler<ComponentEvent> {
   // disk_failed containers etc. This will be reset to 0 periodically.
   public AtomicInteger currentContainerFailure = new AtomicInteger(0);
 
+  private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
+  private ComponentEvent upgradeEvent;
+  private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
+
   private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
       stateMachine;
   private AsyncDispatcher dispatcher;
@@ -131,7 +139,7 @@ public class Component implements EventHandler<ComponentEvent> {
           .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
               new ContainerAllocatedTransition())
           // container launched on NM
-          .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING),
+          .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING, UPGRADING),
               CONTAINER_STARTED, new ContainerStartedTransition())
           // container failed while flexing
           .addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED,
@@ -151,12 +159,19 @@ public class Component implements EventHandler<ComponentEvent> {
           // For flex down, go to STABLE state
           .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
               FLEX, new FlexComponentTransition())
-          .addTransition(STABLE, UPGRADING, UPGRADE,
-              new ComponentNeedsUpgradeTransition())
-          .addTransition(FLEXING, UPGRADING, UPGRADE,
+          .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
               new ComponentNeedsUpgradeTransition())
-          .addTransition(UPGRADING, UPGRADING, UPGRADE,
+          //Upgrade while previous upgrade is still in progress
+          .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
               new ComponentNeedsUpgradeTransition())
+          .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
+              CHECK_STABLE, new CheckStableTransition())
+          .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
+              new CheckStableTransition())
+          .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
+              new ContainerCompletedTransition())
           .installTopology();
 
   public Component(
@@ -291,7 +306,10 @@ public class Component implements EventHandler<ComponentEvent> {
 
       component.pendingInstances.remove(instance);
       instance.setContainer(container);
-      ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
+
+      ProviderUtils.initCompInstanceDir(component.getContext().fs,
+          component.createLaunchContext(component.componentSpec,
+              component.scheduler.getApp().getVersion()), instance);
       component.getScheduler().addLiveCompInstance(container.getId(), instance);
       LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
               "host {}, num pending component instances reduced to {} ",
@@ -317,14 +335,21 @@ public class Component implements EventHandler<ComponentEvent> {
   private static ComponentState checkIfStable(Component component) {
     // if desired == running
     if (component.componentMetrics.containersReady.value() == component
-        .getComponentSpec().getNumberOfContainers()) {
+        .getComponentSpec().getNumberOfContainers() &&
+        component.numContainersThatNeedUpgrade.get() == 0) {
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
       return STABLE;
-    } else {
+    } else if (component.componentMetrics.containersReady.value() != component
+        .getComponentSpec().getNumberOfContainers()) {
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
       return FLEXING;
+    } else {
+      //  component.numContainersThatNeedUpgrade.get() > 0
+      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
+          records.ComponentState.NEEDS_UPGRADE);
+      return UPGRADING;
     }
   }
 
@@ -336,8 +361,9 @@ public class Component implements EventHandler<ComponentEvent> {
         component.componentSpec.getState();
     if (isIncrement) {
       // check if all containers are in READY state
-      if (component.componentMetrics.containersReady
-          .value() == component.componentMetrics.containersDesired.value()) {
+      if (component.numContainersThatNeedUpgrade.get() == 0 &&
+          component.componentMetrics.containersReady.value() ==
+              component.componentMetrics.containersDesired.value()) {
         component.componentSpec.setState(
             org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
         if (curState != component.componentSpec.getState()) {
@@ -346,8 +372,7 @@ public class Component implements EventHandler<ComponentEvent> {
               component.componentSpec.getState());
         }
         // component state change will trigger re-check of service state
-        ServiceMaster.checkAndUpdateServiceState(component.scheduler,
-            isIncrement);
+        component.context.getServiceManager().checkAndUpdateServiceState(true);
       }
     } else {
       // container moving out of READY state could be because of FLEX down so
@@ -362,10 +387,13 @@ public class Component implements EventHandler<ComponentEvent> {
               component.componentSpec.getState());
         }
         // component state change will trigger re-check of service state
-        ServiceMaster.checkAndUpdateServiceState(component.scheduler,
-            isIncrement);
+        component.context.getServiceManager().checkAndUpdateServiceState(false);
       }
     }
+    // when the service is stable then the state of component needs to
+    // transition to stable
+    component.dispatcher.getEventHandler().handle(new ComponentEvent(
+        component.getName(), ComponentEventType.CHECK_STABLE));
   }
 
   private static class ContainerCompletedTransition extends BaseTransition {
@@ -377,15 +405,52 @@ public class Component implements EventHandler<ComponentEvent> {
               STOP).setStatus(event.getStatus()));
       component.componentSpec.setState(
           org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
-      component.getScheduler().getApp().setState(ServiceState.STARTED);
+      if (component.context.service.getState().equals(ServiceState.STABLE)) {
+        component.getScheduler().getApp().setState(ServiceState.STARTED);
+        LOG.info("Service def state changed from {} -> {}",
+            ServiceState.STABLE, ServiceState.STARTED);
+      }
     }
   }
 
   private static class ComponentNeedsUpgradeTransition extends BaseTransition {
     @Override
     public void transition(Component component, ComponentEvent event) {
+      component.upgradeInProgress.set(true);
       component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
           records.ComponentState.NEEDS_UPGRADE);
+      component.numContainersThatNeedUpgrade.set(
+          component.componentSpec.getNumberOfContainers());
+      component.componentSpec.getContainers().forEach(container ->
+          container.setState(ContainerState.NEEDS_UPGRADE));
+      component.upgradeEvent = event;
+    }
+  }
+
+  private static class CheckStableTransition implements MultipleArcTransition
+      <Component, ComponentEvent, ComponentState> {
+
+    @Override
+    public ComponentState transition(Component component,
+        ComponentEvent componentEvent) {
+      org.apache.hadoop.yarn.service.api.records.ComponentState currState =
+          component.componentSpec.getState();
+      if (currState.equals(org.apache.hadoop.yarn.service.api.records
+          .ComponentState.STABLE)) {
+        return ComponentState.STABLE;
+      }
+      // checkIfStable also updates the state in definition when STABLE
+      ComponentState targetState = checkIfStable(component);
+      if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
+        component.componentSpec.overwrite(
+            component.upgradeEvent.getTargetSpec());
+        component.upgradeEvent = null;
+        ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
+            CHECK_STABLE);
+        component.dispatcher.getEventHandler().handle(checkStable);
+        component.upgradeInProgress.set(false);
+      }
+      return targetState;
     }
   }
 
@@ -421,8 +486,28 @@ public class Component implements EventHandler<ComponentEvent> {
         "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
         getName(), container.getId(), instance.getCompInstanceName(),
         container.getNodeId());
-    scheduler.getContainerLaunchService()
-        .launchCompInstance(scheduler.getApp(), instance, container);
+    if (upgradeInProgress.get()) {
+      scheduler.getContainerLaunchService()
+          .launchCompInstance(scheduler.getApp(), instance, container,
+              createLaunchContext(upgradeEvent.getTargetSpec(),
+                  upgradeEvent.getUpgradeVersion()));
+    } else {
+      scheduler.getContainerLaunchService().launchCompInstance(
+          scheduler.getApp(), instance, container,
+          createLaunchContext(componentSpec, scheduler.getApp().getVersion()));
+    }
+  }
+
+  public ContainerLaunchService.ComponentLaunchContext createLaunchContext(
+      org.apache.hadoop.yarn.service.api.records.Component compSpec,
+      String version) {
+    ContainerLaunchService.ComponentLaunchContext launchContext =
+        new ContainerLaunchService.ComponentLaunchContext(compSpec.getName(),
+            version);
+    launchContext.setArtifact(compSpec.getArtifact())
+        .setConfiguration(compSpec.getConfiguration())
+        .setLaunchCommand(compSpec.getLaunchCommand());
+    return launchContext;
   }
 
   @SuppressWarnings({ "unchecked" })
@@ -661,16 +746,24 @@ public class Component implements EventHandler<ComponentEvent> {
     scheduler.getServiceMetrics().containersRunning.decr();
   }
 
-  public void incContainersReady() {
+  public void incContainersReady(boolean updateDefinition) {
     componentMetrics.containersReady.incr();
     scheduler.getServiceMetrics().containersReady.incr();
-    checkAndUpdateComponentState(this, true);
+    if (updateDefinition) {
+      checkAndUpdateComponentState(this, true);
+    }
   }
 
-  public void decContainersReady() {
+  public void decContainersReady(boolean updateDefinition) {
     componentMetrics.containersReady.decr();
     scheduler.getServiceMetrics().containersReady.decr();
-    checkAndUpdateComponentState(this, false);
+    if (updateDefinition) {
+      checkAndUpdateComponentState(this, false);
+    }
+  }
+
+  public void decContainersThatNeedUpgrade() {
+    numContainersThatNeedUpgrade.decrementAndGet();
   }
 
   public int getNumReadyInstances() {
@@ -729,6 +822,16 @@ public class Component implements EventHandler<ComponentEvent> {
       this.readLock.unlock();
     }
   }
+
+  public ComponentEvent getUpgradeEvent() {
+    this.readLock.lock();
+    try {
+      return upgradeEvent;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   public ServiceScheduler getScheduler() {
     return scheduler;
   }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java

@@ -34,6 +34,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
   private ContainerStatus status;
   private ContainerId containerId;
   private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
+  private String upgradeVersion;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -103,4 +104,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
     this.targetSpec = Preconditions.checkNotNull(targetSpec);
     return this;
   }
+
+  public String getUpgradeVersion() {
+    return upgradeVersion;
+  }
+
+  public ComponentEvent setUpgradeVersion(String upgradeVersion) {
+    this.upgradeVersion = upgradeVersion;
+    return this;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java

@@ -25,5 +25,5 @@ public enum ComponentEventType {
   CONTAINER_STARTED,
   CONTAINER_COMPLETED,
   UPGRADE,
-  STOP_UPGRADE
+  CHECK_STABLE
 }

+ 43 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java

@@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.ComponentEventType;
 import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
 import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
@@ -116,10 +118,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       .addTransition(READY, STARTED, BECOME_NOT_READY,
           new ContainerBecomeNotReadyTransition())
       .addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
+      .addTransition(READY, UPGRADING, UPGRADE,
+          new ContainerUpgradeTransition())
+      .addTransition(UPGRADING, UPGRADING, UPGRADE,
+          new ContainerUpgradeTransition())
+      .addTransition(UPGRADING, READY, BECOME_READY,
+          new ContainerBecomeReadyTransition())
+      .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
       .installTopology();
 
-
-
   public ComponentInstance(Component component,
       ComponentInstanceId compInstanceId) {
     this.stateMachine = stateMachineFactory.make(this);
@@ -186,7 +193,17 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
       compInstance.containerSpec.setState(ContainerState.READY);
-      compInstance.component.incContainersReady();
+      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
+        compInstance.component.incContainersReady(false);
+        compInstance.component.decContainersThatNeedUpgrade();
+        ComponentEvent checkState = new ComponentEvent(
+            compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
+        compInstance.scheduler.getDispatcher().getEventHandler().handle(
+            checkState);
+
+      } else {
+        compInstance.component.incContainersReady(true);
+      }
       if (compInstance.timelineServiceEnabled) {
         compInstance.serviceTimelinePublisher
             .componentInstanceBecomeReady(compInstance.containerSpec);
@@ -199,7 +216,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     public void transition(ComponentInstance compInstance,
         ComponentInstanceEvent event) {
       compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
-      compInstance.component.decContainersReady();
+      compInstance.component.decContainersReady(true);
     }
   }
 
@@ -225,9 +242,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
               .getDiagnostics();
       compInstance.diagnostics.append(containerDiag + System.lineSeparator());
       compInstance.cancelContainerStatusRetriever();
-
+      if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
+        compInstance.component.decContainersThatNeedUpgrade();
+      }
       if (compInstance.getState().equals(READY)) {
-        compInstance.component.decContainersReady();
+        compInstance.component.decContainersReady(true);
       }
       compInstance.component.decRunningContainers();
       boolean shouldExit = false;
@@ -287,6 +306,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  private static class ContainerUpgradeTransition extends BaseTransition {
+
+    @Override
+    public void transition(ComponentInstance compInstance,
+        ComponentInstanceEvent event) {
+      compInstance.containerSpec.setState(ContainerState.UPGRADING);
+      compInstance.component.decContainersReady(false);
+      ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
+      compInstance.scheduler.getContainerLaunchService()
+          .reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
+              compInstance.container,
+              compInstance.component.createLaunchContext(
+                  upgradeEvent.getTargetSpec(),
+                  upgradeEvent.getUpgradeVersion()));
+    }
+  }
+
   public ComponentInstanceState getState() {
     this.readLock.lock();
 
@@ -422,7 +458,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
       component.decRunningContainers();
     }
     if (getState() == READY) {
-      component.decContainersReady();
+      component.decContainersReady(true);
       component.decRunningContainers();
     }
     getCompSpec().removeContainer(containerSpec);

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java

@@ -22,5 +22,6 @@ public enum ComponentInstanceEventType {
   START,
   STOP,
   BECOME_READY,
-  BECOME_NOT_READY
+  BECOME_NOT_READY,
+  UPGRADE
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java

@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.yarn.service.conf;
 
+import javax.ws.rs.core.MediaType;
+
 public interface RestApiConstants {
 
   // Rest endpoints
@@ -26,9 +28,19 @@ public interface RestApiConstants {
   String SERVICE_PATH = "/services/{service_name}";
   String COMPONENT_PATH = "/services/{service_name}/components/{component_name}";
 
+  String COMP_INSTANCE_PATH = SERVICE_PATH +
+      "/component-instances/{component_instance_name}";
+  String COMP_INSTANCE_LONG_PATH = COMPONENT_PATH +
+      "/component-instances/{component_instance_name}";
+  String COMP_INSTANCES = "component-instances";
+  String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES;
+
   // Query param
   String SERVICE_NAME = "service_name";
   String COMPONENT_NAME = "component_name";
+  String COMP_INSTANCE_NAME = "component_instance_name";
+
+  String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8";
 
   Long DEFAULT_UNLIMITED_LIFETIME = -1l;
 

+ 88 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java

@@ -18,11 +18,12 @@
 
 package org.apache.hadoop.yarn.service.containerlaunch;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.provider.ProviderService;
 import org.apache.hadoop.yarn.service.provider.ProviderFactory;
@@ -63,36 +64,57 @@ public class ContainerLaunchService extends AbstractService{
   }
 
   public void launchCompInstance(Service service,
-      ComponentInstance instance, Container container) {
+      ComponentInstance instance, Container container,
+      ComponentLaunchContext componentLaunchContext) {
     ContainerLauncher launcher =
-        new ContainerLauncher(service, instance, container);
+        new ContainerLauncher(service, instance, container,
+            componentLaunchContext, false);
     executorService.execute(launcher);
   }
 
+  public void reInitCompInstance(Service service,
+      ComponentInstance instance, Container container,
+      ComponentLaunchContext componentLaunchContext) {
+    ContainerLauncher reInitializer = new ContainerLauncher(service, instance,
+        container, componentLaunchContext, true);
+    executorService.execute(reInitializer);
+  }
+
   private class ContainerLauncher implements Runnable {
     public final Container container;
     public final Service service;
     public ComponentInstance instance;
+    private final ComponentLaunchContext componentLaunchContext;
+    private final boolean reInit;
 
-    public ContainerLauncher(
-        Service service,
-        ComponentInstance instance, Container container) {
+    ContainerLauncher(Service service, ComponentInstance instance,
+        Container container, ComponentLaunchContext componentLaunchContext,
+        boolean reInit) {
       this.container = container;
       this.service = service;
       this.instance = instance;
+      this.componentLaunchContext = componentLaunchContext;
+      this.reInit = reInit;
     }
 
     @Override public void run() {
-      Component compSpec = instance.getCompSpec();
       ProviderService provider = ProviderFactory.getProviderService(
-          compSpec.getArtifact());
+          componentLaunchContext.getArtifact());
       AbstractLauncher launcher = new AbstractLauncher(context);
       try {
         provider.buildContainerLaunchContext(launcher, service,
-            instance, fs, getConfig(), container);
-        instance.getComponent().getScheduler().getNmClient()
-            .startContainerAsync(container,
-                launcher.completeContainerLaunch());
+            instance, fs, getConfig(), container, componentLaunchContext);
+        if (!reInit) {
+          LOG.info("launching container {}", container.getId());
+          instance.getComponent().getScheduler().getNmClient()
+              .startContainerAsync(container,
+                  launcher.completeContainerLaunch());
+        } else {
+          LOG.info("reInitializing container {}", container.getId());
+          instance.getComponent().getScheduler().getNmClient()
+              .reInitializeContainerAsync(container.getId(),
+                  launcher.completeContainerLaunch(), true);
+        }
       } catch (Exception e) {
         LOG.error(instance.getCompInstanceId()
             + ": Failed to launch container. ", e);
@@ -100,4 +122,58 @@ public class ContainerLaunchService extends AbstractService{
       }
     }
   }
+
+  /**
+   * Launch context of a component.
+   */
+  public static class ComponentLaunchContext {
+    private final String name;
+    private final String serviceVersion;
+    private Artifact artifact;
+    private org.apache.hadoop.yarn.service.api.records.Configuration
+        configuration;
+    private String launchCommand;
+
+    public ComponentLaunchContext(String name, String serviceVersion) {
+      this.name = Preconditions.checkNotNull(name);
+      this.serviceVersion = Preconditions.checkNotNull(serviceVersion);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getServiceVersion() {
+      return serviceVersion;
+    }
+
+    public Artifact getArtifact() {
+      return artifact;
+    }
+
+    public org.apache.hadoop.yarn.service.api.records.
+        Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getLaunchCommand() {
+      return launchCommand;
+    }
+
+    public ComponentLaunchContext setArtifact(Artifact artifact) {
+      this.artifact = artifact;
+      return this;
+    }
+
+    public ComponentLaunchContext setConfiguration(org.apache.hadoop.yarn.
+        service.api.records.Configuration configuration) {
+      this.configuration = configuration;
+      return this;
+    }
+
+    public ComponentLaunchContext setLaunchCommand(String launchCommand) {
+      this.launchCommand = launchCommand;
+      return this;
+    }
+  }
 }

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java

@@ -30,6 +30,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@@ -114,4 +116,16 @@ public class ClientAMProtocolPBClientImpl
     }
     return null;
   }
+
+  @Override
+  public CompInstancesUpgradeResponseProto upgrade(
+      CompInstancesUpgradeRequestProto request)
+      throws IOException, YarnException {
+    try {
+      return proxy.upgrade(null, request);
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+    }
+    return null;
+  }
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@@ -91,4 +93,14 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public CompInstancesUpgradeResponseProto upgrade(RpcController controller,
+      CompInstancesUpgradeRequestProto request) throws ServiceException {
+    try {
+      return real.upgrade(request);
+    } catch (IOException | YarnException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 13 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/AbstractProviderService.java

@@ -23,8 +23,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
-import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
@@ -60,9 +60,9 @@ public abstract class AbstractProviderService implements ProviderService,
 
   public void buildContainerLaunchContext(AbstractLauncher launcher,
       Service service, ComponentInstance instance,
-      SliderFileSystem fileSystem, Configuration yarnConf, Container container)
+      SliderFileSystem fileSystem, Configuration yarnConf, Container container,
+      ContainerLaunchService.ComponentLaunchContext compLaunchContext)
       throws IOException, SliderException {
-    Component component = instance.getComponent().getComponentSpec();;
     processArtifact(launcher, instance, fileSystem, service);
 
     ServiceContext context =
@@ -72,11 +72,12 @@ public abstract class AbstractProviderService implements ProviderService,
     Map<String, String> globalTokens =
         instance.getComponent().getScheduler().globalTokens;
     Map<String, String> tokensForSubstitution = ProviderUtils
-        .initCompTokensForSubstitute(instance, container);
+        .initCompTokensForSubstitute(instance, container,
+            compLaunchContext);
     tokensForSubstitution.putAll(globalTokens);
     // Set the environment variables in launcher
-    launcher.putEnv(ServiceUtils
-        .buildEnvMap(component.getConfiguration(), tokensForSubstitution));
+    launcher.putEnv(ServiceUtils.buildEnvMap(
+        compLaunchContext.getConfiguration(), tokensForSubstitution));
     launcher.setEnv("WORK_DIR", ApplicationConstants.Environment.PWD.$());
     launcher.setEnv("LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR);
     if (System.getenv(HADOOP_USER_NAME) != null) {
@@ -94,10 +95,10 @@ public abstract class AbstractProviderService implements ProviderService,
 
     // create config file on hdfs and add local resource
     ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
-        component, tokensForSubstitution, instance, context);
+        compLaunchContext, tokensForSubstitution, instance, context);
 
     // substitute launch command
-    String launchCommand = component.getLaunchCommand();
+    String launchCommand = compLaunchContext.getLaunchCommand();
     // docker container may have empty commands
     if (!StringUtils.isEmpty(launchCommand)) {
       launchCommand = ProviderUtils
@@ -111,12 +112,12 @@ public abstract class AbstractProviderService implements ProviderService,
     // By default retry forever every 30 seconds
     launcher.setRetryContext(
         YarnServiceConf.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX,
-            component.getConfiguration(), yarnConf),
+            compLaunchContext.getConfiguration(), yarnConf),
         YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL,
-            DEFAULT_CONTAINER_RETRY_INTERVAL, component.getConfiguration(),
-            yarnConf),
+            DEFAULT_CONTAINER_RETRY_INTERVAL,
+            compLaunchContext.getConfiguration(), yarnConf),
         YarnServiceConf.getLong(CONTAINER_FAILURES_VALIDITY_INTERVAL,
             DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
-            component.getConfiguration(), yarnConf));
+            compLaunchContext.getConfiguration(), yarnConf));
   }
 }

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderService.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.provider;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
@@ -35,6 +36,8 @@ public interface ProviderService {
    */
   void buildContainerLaunchContext(AbstractLauncher containerLauncher,
       Service service, ComponentInstance instance,
-      SliderFileSystem sliderFileSystem, Configuration yarnConf, Container
-      container) throws IOException, SliderException;
+      SliderFileSystem sliderFileSystem, Configuration yarnConf,
+      Container container,
+      ContainerLaunchService.ComponentLaunchContext componentLaunchContext)
+      throws IOException, SliderException;
 }

+ 18 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java

@@ -27,12 +27,12 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
 import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
@@ -51,7 +51,11 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
-import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_ID;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_INSTANCE_NAME;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME_LC;
+import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.CONTAINER_ID;
 
 /**
  * This is a factoring out of methods handy for providers. It's bonded to a log
@@ -160,9 +164,11 @@ public class ProviderUtils implements YarnServiceConstants {
   }
 
   public static Path initCompInstanceDir(SliderFileSystem fs,
+      ContainerLaunchService.ComponentLaunchContext compLaunchContext,
       ComponentInstance instance) {
     Path compDir = new Path(new Path(fs.getAppDir(), "components"),
-        instance.getCompName());
+        compLaunchContext.getServiceVersion() + "/" +
+            compLaunchContext.getName());
     Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
     instance.setCompInstanceDir(compInstanceDir);
     return compInstanceDir;
@@ -171,10 +177,11 @@ public class ProviderUtils implements YarnServiceConstants {
   // 1. Create all config files for a component on hdfs for localization
   // 2. Add the config file to localResource
   public static synchronized void createConfigFileAndAddLocalResource(
-      AbstractLauncher launcher, SliderFileSystem fs, Component component,
+      AbstractLauncher launcher, SliderFileSystem fs,
+      ContainerLaunchService.ComponentLaunchContext compLaunchContext,
       Map<String, String> tokensForSubstitution, ComponentInstance instance,
       ServiceContext context) throws IOException {
-    Path compInstanceDir = initCompInstanceDir(fs, instance);
+    Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
     if (!fs.getFileSystem().exists(compInstanceDir)) {
       log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
       fs.getFileSystem().mkdirs(compInstanceDir,
@@ -189,7 +196,8 @@ public class ProviderUtils implements YarnServiceConstants {
           + tokensForSubstitution);
     }
 
-    for (ConfigFile originalFile : component.getConfiguration().getFiles()) {
+    for (ConfigFile originalFile : compLaunchContext.getConfiguration()
+        .getFiles()) {
       ConfigFile configFile = originalFile.copy();
       String fileName = new Path(configFile.getDestFile()).getName();
 
@@ -343,11 +351,12 @@ public class ProviderUtils implements YarnServiceConstants {
    * @return tokens to replace
    */
   public static Map<String, String> initCompTokensForSubstitute(
-      ComponentInstance instance, Container container) {
+      ComponentInstance instance, Container container,
+      ContainerLaunchService.ComponentLaunchContext componentLaunchContext) {
     Map<String, String> tokens = new HashMap<>();
-    tokens.put(COMPONENT_NAME, instance.getCompSpec().getName());
+    tokens.put(COMPONENT_NAME, componentLaunchContext.getName());
     tokens
-        .put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase());
+        .put(COMPONENT_NAME_LC, componentLaunchContext.getName().toLowerCase());
     tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
     tokens.put(CONTAINER_ID, container.getId().toString());
     tokens.put(COMPONENT_ID,

+ 47 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.service.utils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -27,12 +29,13 @@ import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Configuration;
 import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
 import org.apache.hadoop.yarn.service.api.records.Resource;
-import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.conf.RestApiConstants;
 import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
@@ -66,6 +69,8 @@ public class ServiceApiUtil {
   private static final PatternValidator userNamePattern
       = new PatternValidator("[a-z][a-z0-9-.]*");
 
+
+
   @VisibleForTesting
   public static void setJsonSerDeser(JsonSerDeser jsd) {
     jsonSerDeser = jsd;
@@ -496,6 +501,47 @@ public class ServiceApiUtil {
     return appJson;
   }
 
+  public static List<Container> getLiveContainers(Service service,
+      List<String> componentInstances)
+      throws YarnException {
+    List<Container> result = new ArrayList<>();
+
+    // In order to avoid iterating over all the containers of all components,
+    // first find the affected components by parsing the instance name.
+    Multimap<String, String> affectedComps = ArrayListMultimap.create();
+    for (String instanceName : componentInstances) {
+      affectedComps.put(
+          ServiceApiUtil.parseComponentName(instanceName), instanceName);
+    }
+
+    service.getComponents().forEach(comp -> {
+      // Iterating once over the containers of the affected component to
+      // find all the containers. Avoiding multiple calls to
+      // service.getComponent(...) and component.getContainer(...) because they
+      // iterate over all the components of the service and all the containers
+      // of the components respectively.
+      if (affectedComps.get(comp.getName()) != null) {
+        Collection<String> instanceNames = affectedComps.get(comp.getName());
+        comp.getContainers().forEach(container -> {
+          if (instanceNames.contains(container.getComponentInstanceName())) {
+            result.add(container);
+          }
+        });
+      }
+    });
+    return result;
+  }
+
+  private static String parseComponentName(String componentInstanceName)
+      throws YarnException {
+    int idx = componentInstanceName.lastIndexOf('-');
+    if (idx == -1) {
+      throw new YarnException("Invalid component instance (" +
+          componentInstanceName + ") name.");
+    }
+    return componentInstanceName.substring(0, idx);
+  }
+
   public static String $(String s) {
     return "${" + s +"}";
   }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto

@@ -30,6 +30,8 @@ service ClientAMProtocolService {
     returns (UpgradeServiceResponseProto);
   rpc restartService(RestartServiceRequestProto)
     returns (RestartServiceResponseProto);
+  rpc upgrade(CompInstancesUpgradeRequestProto) returns
+    (CompInstancesUpgradeResponseProto);
 }
 
 message FlexComponentsRequestProto {
@@ -61,13 +63,22 @@ message StopResponseProto {
 
 message UpgradeServiceRequestProto {
   optional string version = 1;
+  optional bool autoFinalize = 2;
 }
 
 message UpgradeServiceResponseProto {
+  optional string error = 1;
 }
 
 message RestartServiceRequestProto {
 }
 
 message RestartServiceResponseProto {
+}
+
+message CompInstancesUpgradeRequestProto {
+    repeated string containerIds = 1;
+}
+
+message CompInstancesUpgradeResponseProto {
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java

@@ -108,6 +108,7 @@ public class TestServiceAM extends ServiceTestUtils{
     ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
+    exampleApp.setVersion("v1");
     exampleApp.setName("testContainerCompleted");
     exampleApp.addComponent(createComponent("compa", 1, "pwd"));
 
@@ -146,6 +147,7 @@ public class TestServiceAM extends ServiceTestUtils{
         System.currentTimeMillis(), 1);
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
+    exampleApp.setVersion("v1");
     exampleApp.setName("testContainersRecovers");
     String comp1Name = "comp1";
     String comp1InstName = "comp1-0";
@@ -189,6 +191,7 @@ public class TestServiceAM extends ServiceTestUtils{
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testContainersRecovers");
+    exampleApp.setVersion("v1");
     String comp1Name = "comp1";
     String comp1InstName = "comp1-0";
 
@@ -230,6 +233,7 @@ public class TestServiceAM extends ServiceTestUtils{
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testContainersFromDifferentApp");
+    exampleApp.setVersion("v1");
     String comp1Name = "comp1";
     String comp1InstName = "comp1-0";
 
@@ -270,6 +274,7 @@ public class TestServiceAM extends ServiceTestUtils{
     Service exampleApp = new Service();
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testScheduleWithMultipleResourceTypes");
+    exampleApp.setVersion("v1");
 
     List<ResourceTypeInfo> resourceTypeInfos = new ArrayList<>(
         ResourceUtils.getResourcesTypeInfo());

+ 112 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java

@@ -49,7 +49,7 @@ public class TestServiceManager {
   @Test
   public void testUpgrade() throws IOException, SliderException {
     ServiceManager serviceManager = createTestServiceManager("testUpgrade");
-    upgrade(serviceManager, "v2", false);
+    upgrade(serviceManager, "v2", false, false);
     Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
         serviceManager.getServiceSpec().getState());
   }
@@ -57,8 +57,9 @@ public class TestServiceManager {
   @Test
   public void testRestartNothingToUpgrade()
       throws IOException, SliderException {
-    ServiceManager serviceManager = createTestServiceManager("testRestart");
-    upgrade(serviceManager, "v2", false);
+    ServiceManager serviceManager = createTestServiceManager(
+        "testRestartNothingToUpgrade");
+    upgrade(serviceManager, "v2", false, false);
 
     //make components stable
     serviceManager.getServiceSpec().getComponents().forEach(comp -> {
@@ -69,22 +70,119 @@ public class TestServiceManager {
         serviceManager.getServiceSpec().getState());
   }
 
+  @Test
+  public void testAutoFinalizeNothingToUpgrade() throws IOException,
+      SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testAutoFinalizeNothingToUpgrade");
+    upgrade(serviceManager, "v2", false, true);
+
+    //make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp ->
+        comp.setState(ComponentState.STABLE));
+    serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE));
+    Assert.assertEquals("service stable", ServiceState.STABLE,
+        serviceManager.getServiceSpec().getState());
+  }
+
   @Test
   public void testRestartWithPendingUpgrade()
       throws IOException, SliderException {
     ServiceManager serviceManager = createTestServiceManager("testRestart");
-    upgrade(serviceManager, "v2", true);
+    upgrade(serviceManager, "v2", true, false);
     serviceManager.handle(new ServiceEvent(ServiceEventType.START));
     Assert.assertEquals("service should still be upgrading",
         ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
   }
 
+  @Test
+  public void testCheckState() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testCheckState");
+    upgrade(serviceManager, "v2", true, false);
+    Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
+        serviceManager.getServiceSpec().getState());
+
+    // make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
+      comp.setState(ComponentState.STABLE);
+    });
+    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
+    serviceManager.handle(checkStable);
+    Assert.assertEquals("service should still be upgrading",
+        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+
+    // finalize service
+    ServiceEvent restart = new ServiceEvent(ServiceEventType.START);
+    serviceManager.handle(restart);
+    Assert.assertEquals("service not stable",
+        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+
+    validateUpgradeFinalization(serviceManager.getName(), "v2");
+  }
 
-  private void upgrade(ServiceManager service, String version,
-      boolean upgradeArtifact)
+  @Test
+  public void testCheckStateAutoFinalize() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testCheckState");
+    serviceManager.getServiceSpec().setState(
+        ServiceState.UPGRADING_AUTO_FINALIZE);
+    upgrade(serviceManager, "v2", true, true);
+    Assert.assertEquals("service not upgrading",
+        ServiceState.UPGRADING_AUTO_FINALIZE,
+        serviceManager.getServiceSpec().getState());
+
+    // make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp ->
+        comp.setState(ComponentState.STABLE));
+    ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
+    serviceManager.handle(checkStable);
+    Assert.assertEquals("service not stable",
+        ServiceState.STABLE, serviceManager.getServiceSpec().getState());
+
+    validateUpgradeFinalization(serviceManager.getName(), "v2");
+  }
+
+  @Test
+  public void testInvalidUpgrade() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager(
+        "testInvalidUpgrade");
+    serviceManager.getServiceSpec().setState(
+        ServiceState.UPGRADING_AUTO_FINALIZE);
+    Service upgradedDef = ServiceTestUtils.createExampleApplication();
+    upgradedDef.setName(serviceManager.getName());
+    upgradedDef.setVersion("v2");
+    upgradedDef.setLifetime(2L);
+    writeUpgradedDef(upgradedDef);
+
+    try {
+      serviceManager.processUpgradeRequest("v2", true);
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof UnsupportedOperationException);
+      return;
+    }
+    Assert.fail();
+  }
+
+  private void validateUpgradeFinalization(String serviceName,
+      String expectedVersion) throws IOException {
+    Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
+    Assert.assertEquals("service def not re-written", expectedVersion,
+        savedSpec.getVersion());
+    Assert.assertNotNull("app id not present", savedSpec.getId());
+    Assert.assertEquals("state not stable", ServiceState.STABLE,
+        savedSpec.getState());
+    savedSpec.getComponents().forEach(compSpec -> {
+      Assert.assertEquals("comp not stable", ComponentState.STABLE,
+          compSpec.getState());
+    });
+  }
+
+  private void upgrade(ServiceManager serviceManager, String version,
+      boolean upgradeArtifact, boolean autoFinalize)
       throws IOException, SliderException {
     Service upgradedDef = ServiceTestUtils.createExampleApplication();
-    upgradedDef.setName(service.getName());
+    upgradedDef.setName(serviceManager.getName());
     upgradedDef.setVersion(version);
     if (upgradeArtifact) {
       Artifact upgradedArtifact = createTestArtifact("2");
@@ -93,9 +191,13 @@ public class TestServiceManager {
       });
     }
     writeUpgradedDef(upgradedDef);
+    serviceManager.processUpgradeRequest(version, autoFinalize);
     ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
-    upgradeEvent.setVersion("v2");
-    service.handle(upgradeEvent);
+    upgradeEvent.setVersion(version);
+    if (autoFinalize) {
+      upgradeEvent.setAutoFinalize(true);
+    }
+    serviceManager.handle(upgradeEvent);
   }
 
   private ServiceManager createTestServiceManager(String name)
@@ -124,7 +226,7 @@ public class TestServiceManager {
     return new ServiceManager(context);
   }
 
-  static Service createBaseDef(String name) {
+  public static Service createBaseDef(String name) {
     ApplicationId applicationId = ApplicationId.newInstance(
         System.currentTimeMillis(), 1);
     Service serviceDef = ServiceTestUtils.createExampleApplication();

+ 28 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java

@@ -31,9 +31,9 @@ import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.service.api.records.Component;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
 import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
@@ -372,25 +372,47 @@ public class TestYarnNativeServices extends ServiceTestUtils {
   }
 
   @Test(timeout = 200000)
-  public void testUpgradeService() throws Exception {
+  public void testUpgrade() throws Exception {
     setupInternal(NUM_NMS);
     ServiceClient client = createClient(getConf());
 
     Service service = createExampleApplication();
     client.actionCreate(service);
-    waitForServiceToBeStarted(client, service);
+    waitForServiceToBeStable(client, service);
 
-    //upgrade the service
+    // upgrade the service
+    Component component = service.getComponents().iterator().next();
+    service.setState(ServiceState.UPGRADING);
     service.setVersion("v2");
-    client.actionUpgrade(service);
+    component.getConfiguration().getEnv().put("key1", "val1");
+    client.initiateUpgrade(service);
 
-    //wait for service to be in upgrade state
+    // wait for service to be in upgrade state
     waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
     SliderFileSystem fs = new SliderFileSystem(getConf());
     Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs,
         service.getName(), service.getVersion());
     Assert.assertEquals(service.getName(), fromFs.getName());
     Assert.assertEquals(service.getVersion(), fromFs.getVersion());
+
+    // upgrade containers
+    Service liveService = client.getStatus(service.getName());
+    client.actionUpgrade(service,
+        liveService.getComponent(component.getName()).getContainers());
+    waitForAllCompToBeReady(client, service);
+
+    // finalize the upgrade
+    client.actionStart(service.getName());
+    waitForServiceToBeStable(client, service);
+    Service active = client.getStatus(service.getName());
+    Assert.assertEquals("component not stable", ComponentState.STABLE,
+        active.getComponent(component.getName()).getState());
+    Assert.assertEquals("comp does not have new env", "val1",
+        active.getComponent(component.getName()).getConfiguration()
+            .getEnv("key1"));
+    LOG.info("Stop/destroy service {}", service);
+    client.actionStop(service.getName(), true);
+    client.actionDestroy(service.getName());
   }
 
   // Test to verify ANTI_AFFINITY placement policy

+ 75 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java

@@ -21,9 +21,9 @@ package org.apache.hadoop.yarn.service.client;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.conf.ExampleAppJson;
@@ -36,12 +36,15 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
+import java.io.PrintStream;
 import java.util.List;
 
+import static org.apache.hadoop.yarn.client.api.AppAdminClient.YARN_APP_ADMIN_CLIENT_PREFIX;
 import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
+import static org.mockito.Mockito.spy;
 
 public class TestServiceCLI {
   private static final Logger LOG = LoggerFactory.getLogger(TestServiceCLI
@@ -51,33 +54,36 @@ public class TestServiceCLI {
   private File basedir;
   private SliderFileSystem fs;
   private String basedirProp;
+  private ApplicationCLI cli;
 
-  private void runCLI(String[] args) throws Exception {
-    LOG.info("running CLI: yarn {}", Arrays.asList(args));
-    ApplicationCLI cli = new ApplicationCLI();
-    cli.setSysOutPrintStream(System.out);
-    cli.setSysErrPrintStream(System.err);
-    int res = ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
-    cli.stop();
+  private void createCLI() {
+    cli = new ApplicationCLI();
+    PrintStream sysOut = spy(new PrintStream(new ByteArrayOutputStream()));
+    PrintStream sysErr = spy(new PrintStream(new ByteArrayOutputStream()));
+    cli.setSysOutPrintStream(sysOut);
+    cli.setSysErrPrintStream(sysErr);
+    conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
+        DummyServiceClient.class.getName());
+    cli.setConf(conf);
   }
 
   private void buildApp(String serviceName, String appDef) throws Throwable {
     String[] args = {"app",
         "-D", basedirProp, "-save", serviceName,
         ExampleAppJson.resourceName(appDef),
-        "-appTypes", AppAdminClient.UNIT_TEST_TYPE};
-    runCLI(args);
+        "-appTypes", DUMMY_APP_TYPE};
+    ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
   }
 
-  private void buildApp(String serviceName, String appDef, String lifetime,
-      String queue) throws Throwable {
+  private void buildApp(String serviceName, String appDef,
+      String lifetime, String queue) throws Throwable {
     String[] args = {"app",
         "-D", basedirProp, "-save", serviceName,
         ExampleAppJson.resourceName(appDef),
-        "-appTypes", AppAdminClient.UNIT_TEST_TYPE,
+        "-appTypes", DUMMY_APP_TYPE,
         "-updateLifetime", lifetime,
         "-changeQueue", queue};
-    runCLI(args);
+    ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
   }
 
   @Before
@@ -91,6 +97,7 @@ public class TestServiceCLI {
     } else {
       basedir.mkdirs();
     }
+    createCLI();
   }
 
   @After
@@ -98,6 +105,7 @@ public class TestServiceCLI {
     if (basedir != null) {
       FileUtils.deleteDirectory(basedir);
     }
+    cli.stop();
   }
 
   @Test
@@ -114,6 +122,38 @@ public class TestServiceCLI {
     checkApp(serviceName, "master", 1L, 1000L, "qname");
   }
 
+  @Test
+  public void testInitiateServiceUpgrade() throws Exception {
+    String[] args = {"app", "-upgrade", "app-1",
+        "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
+        "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
+  @Test
+  public void testInitiateAutoFinalizeServiceUpgrade() throws Exception {
+    String[] args =  {"app", "-upgrade", "app-1",
+        "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
+        "-autoFinalize",
+        "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
+  @Test
+  public void testUpgradeInstances() throws Exception {
+    conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
+        DummyServiceClient.class.getName());
+    cli.setConf(conf);
+    String[] args = {"app", "-upgrade", "app-1",
+        "-instances", "comp1-0,comp1-1",
+        "-appTypes", DUMMY_APP_TYPE};
+    int result = cli.run(ApplicationCLI.preProcessArgs(args));
+    Assert.assertEquals(result, 0);
+  }
+
+
   private void checkApp(String serviceName, String compName, long count, Long
       lifetime, String queue) throws IOException {
     Service service = ServiceApiUtil.loadService(fs, serviceName);
@@ -130,4 +170,24 @@ public class TestServiceCLI {
     }
     Assert.fail();
   }
+
+  private static final String DUMMY_APP_TYPE = "dummy";
+
+  /**
+   * Dummy service client for test purpose.
+   */
+  public static class DummyServiceClient extends ServiceClient {
+
+    @Override
+    public int initiateUpgrade(String appName, String fileName,
+        boolean autoFinalize) throws IOException, YarnException {
+      return 0;
+    }
+
+    @Override
+    public int actionUpgradeInstances(String appName,
+        List<String> componentInstances) throws IOException, YarnException {
+      return 0;
+    }
+  }
 }

+ 136 - 51
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java

@@ -24,17 +24,29 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.ClientAMProtocol;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.ServiceTestUtils;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Container;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Matchers;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,79 +59,152 @@ import static org.mockito.Mockito.when;
  */
 public class TestServiceClient {
 
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestServiceClient.class);
+
   @Rule
   public ServiceTestUtils.ServiceFSWatcher rule =
       new ServiceTestUtils.ServiceFSWatcher();
 
   @Test
-  public void testActionUpgrade() throws Exception {
-    ApplicationId applicationId = ApplicationId.newInstance(
-        System.currentTimeMillis(), 1);
-    ServiceClient client = createServiceClient(applicationId);
-
-    Service service = ServiceTestUtils.createExampleApplication();
-    service.setVersion("v1");
-    client.actionCreate(service);
+  public void testActionServiceUpgrade() throws Exception {
+    Service service = createService();
+    ServiceClient client = MockServiceClient.create(rule, service);
 
     //upgrade the service
     service.setVersion("v2");
-    client.actionUpgrade(service);
+    client.initiateUpgrade(service);
 
-    //wait for service to be in upgrade state
     Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(),
         service.getName(), service.getVersion());
     Assert.assertEquals(service.getName(), fromFs.getName());
     Assert.assertEquals(service.getVersion(), fromFs.getVersion());
+    client.stop();
   }
 
+  @Test
+  public void testActionCompInstanceUpgrade() throws Exception {
+    Service service = createService();
+    MockServiceClient client = MockServiceClient.create(rule, service);
 
-  private ServiceClient createServiceClient(ApplicationId applicationId)
-      throws Exception {
-    ClientAMProtocol amProxy = mock(ClientAMProtocol.class);
-    YarnClient yarnClient = createMockYarnClient();
-    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
-        applicationId, 1);
-    ApplicationAttemptReport attemptReport =
-        ApplicationAttemptReport.newInstance(attemptId, "localhost", 0,
-            null, null, null,
-        YarnApplicationAttemptState.RUNNING, null);
-
-    ApplicationReport appReport = mock(ApplicationReport.class);
-    when(appReport.getHost()).thenReturn("localhost");
-
-    when(yarnClient.getApplicationAttemptReport(Matchers.any()))
-        .thenReturn(attemptReport);
-    when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport);
-
-    ServiceClient client = new ServiceClient() {
-      @Override
-      protected void serviceInit(Configuration configuration) throws Exception {
-      }
+    //upgrade the service
+    service.setVersion("v2");
+    client.initiateUpgrade(service);
+
+    //add containers to the component that needs to be upgraded.
+    Component comp = service.getComponents().iterator().next();
+    ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L);
+    comp.addContainer(new Container().id(containerId.toString()));
+
+    client.actionUpgrade(service, comp.getContainers());
+    CompInstancesUpgradeResponseProto response = client.getLastProxyResponse(
+        CompInstancesUpgradeResponseProto.class);
+    Assert.assertNotNull("upgrade did not complete", response);
+    client.stop();
+  }
 
-      @Override
-      protected ClientAMProtocol createAMProxy(String serviceName,
-          ApplicationReport appReport) throws IOException, YarnException {
-        return amProxy;
-      }
+  private Service createService() throws IOException,
+      YarnException {
+    Service service = ServiceTestUtils.createExampleApplication();
+    service.setVersion("v1");
+    service.setState(ServiceState.UPGRADING);
+    return service;
+  }
 
-      @Override
-      ApplicationId submitApp(Service app) throws IOException, YarnException {
-        return applicationId;
+  private static final class MockServiceClient extends ServiceClient {
+
+    private final ApplicationId appId;
+    private final ApplicationAttemptId attemptId;
+    private final ClientAMProtocol amProxy;
+    private Object proxyResponse;
+    private Service service;
+
+    private MockServiceClient()  {
+      amProxy = mock(ClientAMProtocol.class);
+      appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+      LOG.debug("mocking service client for {}", appId);
+      attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    }
+
+    static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule,
+        Service service)
+        throws IOException, YarnException {
+      MockServiceClient client = new MockServiceClient();
+
+      YarnClient yarnClient = createMockYarnClient();
+      ApplicationReport appReport = mock(ApplicationReport.class);
+      when(appReport.getHost()).thenReturn("localhost");
+      when(appReport.getYarnApplicationState()).thenReturn(
+          YarnApplicationState.RUNNING);
+
+      ApplicationAttemptReport attemptReport =
+          ApplicationAttemptReport.newInstance(client.attemptId, "localhost", 0,
+              null, null, null,
+              YarnApplicationAttemptState.RUNNING, null);
+      when(yarnClient.getApplicationAttemptReport(Matchers.any()))
+          .thenReturn(attemptReport);
+      when(yarnClient.getApplicationReport(client.appId)).thenReturn(appReport);
+      when(client.amProxy.upgrade(
+          Matchers.any(UpgradeServiceRequestProto.class))).thenAnswer(
+          (Answer<UpgradeServiceResponseProto>) invocation -> {
+              UpgradeServiceResponseProto response =
+                  UpgradeServiceResponseProto.newBuilder().build();
+              client.proxyResponse = response;
+              return response;
+            });
+      when(client.amProxy.upgrade(Matchers.any(
+          CompInstancesUpgradeRequestProto.class))).thenAnswer(
+          (Answer<CompInstancesUpgradeResponseProto>) invocation -> {
+              CompInstancesUpgradeResponseProto response =
+                CompInstancesUpgradeResponseProto.newBuilder().build();
+              client.proxyResponse = response;
+              return response;
+            });
+      client.setFileSystem(rule.getFs());
+      client.setYarnClient(yarnClient);
+      client.service = service;
+
+      client.init(rule.getConf());
+      client.start();
+      client.actionCreate(service);
+      return client;
+    }
+
+    @Override
+    protected void serviceInit(Configuration configuration) throws Exception {
+    }
+
+    @Override
+    protected ClientAMProtocol createAMProxy(String serviceName,
+        ApplicationReport appReport) throws IOException, YarnException {
+      return amProxy;
+    }
+
+    @Override
+    ApplicationId submitApp(Service app) throws IOException, YarnException {
+      return appId;
+    }
+
+    @Override
+    public Service getStatus(String serviceName) throws IOException,
+        YarnException {
+      service.setState(ServiceState.STABLE);
+      return service;
+    }
+
+    private <T> T getLastProxyResponse(Class<T> clazz) {
+      if (clazz.isInstance(proxyResponse)) {
+        return clazz.cast(proxyResponse);
       }
-    };
-
-    client.setFileSystem(rule.getFs());
-    client.setYarnClient(yarnClient);
-
-    client.init(rule.getConf());
-    client.start();
-    return client;
+      return null;
+    }
   }
 
-  private YarnClient createMockYarnClient() throws IOException, YarnException {
+  private static YarnClient createMockYarnClient() throws IOException,
+      YarnException {
     YarnClient yarnClient = mock(YarnClient.class);
-    when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class)))
-        .thenReturn(new ArrayList<>());
+    when(yarnClient.getApplications(Matchers.any(
+        GetApplicationsRequest.class))).thenReturn(new ArrayList<>());
     return yarnClient;
   }
 }

+ 265 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.ServiceContext;
+import org.apache.hadoop.yarn.service.ServiceScheduler;
+import org.apache.hadoop.yarn.service.ServiceTestUtils;
+import org.apache.hadoop.yarn.service.TestServiceManager;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
+import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
+import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link Component}.
+ */
+public class TestComponent {
+
+  @Rule
+  public ServiceTestUtils.ServiceFSWatcher rule =
+      new ServiceTestUtils.ServiceFSWatcher();
+
+  @Test
+  public void testComponentUpgrade() throws Exception {
+    ServiceContext context = createTestContext(rule, "testComponentUpgrade");
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+
+    ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
+        ComponentEventType.UPGRADE);
+    comp.handle(upgradeEvent);
+    Assert.assertEquals("component not in need upgrade state",
+        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+  }
+
+  @Test
+  public void testCheckState() throws Exception {
+    String serviceName = "testCheckState";
+    ServiceContext context = createTestContext(rule, serviceName);
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+
+    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
+        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
+            "val1")).setUpgradeVersion("v2"));
+
+    // one instance finished upgrading
+    comp.decContainersThatNeedUpgrade();
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+    Assert.assertEquals("component not in need upgrade state",
+        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
+
+    // second instance finished upgrading
+    comp.decContainersThatNeedUpgrade();
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in stable state",
+        ComponentState.STABLE, comp.getComponentSpec().getState());
+    Assert.assertEquals("component did not upgrade successfully", "val1",
+        comp.getComponentSpec().getConfiguration().getEnv("key1"));
+  }
+
+  @Test
+  public void testContainerCompletedWhenUpgrading() throws Exception {
+    String serviceName = "testContainerComplete";
+    ServiceContext context = createTestContext(rule, serviceName);
+    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
+        .next().getValue();
+
+    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
+        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
+            "val1")).setUpgradeVersion("v2"));
+    comp.getAllComponentInstances().forEach(instance -> {
+      instance.handle(new ComponentInstanceEvent(
+          instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
+    });
+    Iterator<ComponentInstance> instanceIter = comp.
+        getAllComponentInstances().iterator();
+
+    // reinitialization of a container failed
+    ContainerStatus status = mock(ContainerStatus.class);
+    when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED);
+    ComponentInstance instance = instanceIter.next();
+    ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
+        ComponentEventType.CONTAINER_COMPLETED)
+        .setInstance(instance).setContainerId(instance.getContainer().getId())
+        .setStatus(status);
+    comp.handle(stopEvent);
+    instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
+        STOP).setStatus(status));
+
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in flexing state",
+        ComponentState.FLEXING, comp.getComponentSpec().getState());
+
+    // new container get allocated
+    assignNewContainer(context.attemptId, 10, context, comp);
+
+    // second instance finished upgrading
+    ComponentInstance instance2 = instanceIter.next();
+    instance2.handle(new ComponentInstanceEvent(
+        instance2.getContainer().getId(),
+        ComponentInstanceEventType.BECOME_READY));
+    comp.handle(new ComponentEvent(comp.getName(),
+        ComponentEventType.CHECK_STABLE));
+
+    Assert.assertEquals("component not in stable state",
+        ComponentState.STABLE, comp.getComponentSpec().getState());
+    Assert.assertEquals("component did not upgrade successfully", "val1",
+        comp.getComponentSpec().getConfiguration().getEnv("key1"));
+  }
+
+  private static org.apache.hadoop.yarn.service.api.records.Component
+      createSpecWithEnv(String serviceName, String compName, String key,
+      String val) {
+    Service service = TestServiceManager.createBaseDef(serviceName);
+    org.apache.hadoop.yarn.service.api.records.Component spec =
+        service.getComponent(compName);
+    spec.getConfiguration().getEnv().put(key, val);
+    return spec;
+  }
+
+  public static ServiceContext createTestContext(
+      ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
+      throws Exception {
+    ServiceContext context = new ServiceContext();
+    context.service = TestServiceManager.createBaseDef(serviceName);
+    context.fs = fsWatcher.getFs();
+
+    ContainerLaunchService mockLaunchService = mock(
+        ContainerLaunchService.class);
+
+    context.scheduler = new ServiceScheduler(context) {
+      @Override
+      protected YarnRegistryViewForProviders createYarnRegistryOperations(
+          ServiceContext context, RegistryOperations registryClient) {
+        return mock(YarnRegistryViewForProviders.class);
+      }
+
+      @Override
+      public NMClientAsync createNMClient() {
+        NMClientAsync nmClientAsync = super.createNMClient();
+        NMClient nmClient = mock(NMClient.class);
+        try {
+          when(nmClient.getContainerStatus(anyObject(), anyObject()))
+              .thenAnswer((Answer<ContainerStatus>) invocation ->
+                  ContainerStatus.newInstance(
+                      (ContainerId) invocation.getArguments()[0],
+                      org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+                      "", 0));
+        } catch (YarnException | IOException e) {
+          throw new RuntimeException(e);
+        }
+        nmClientAsync.setClient(nmClient);
+        return nmClientAsync;
+      }
+
+      @Override
+      public ContainerLaunchService getContainerLaunchService() {
+        return mockLaunchService;
+      }
+    };
+    context.scheduler.init(fsWatcher.getConf());
+
+    doNothing().when(mockLaunchService).
+        reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
+    stabilizeComponents(context);
+    return context;
+  }
+
+  private static void stabilizeComponents(ServiceContext context) {
+
+    ApplicationId appId = ApplicationId.fromString(context.service.getId());
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    context.attemptId = attemptId;
+    Map<String, Component>
+        componentState = context.scheduler.getAllComponents();
+    for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
+        context.service.getComponents()) {
+      Component component = new org.apache.hadoop.yarn.service.component.
+          Component(componentSpec, 1L, context);
+      componentState.put(component.getName(), component);
+      component.handle(new ComponentEvent(component.getName(),
+          ComponentEventType.FLEX));
+      for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
+        assignNewContainer(attemptId, i + 1, context, component);
+      }
+      component.handle(new ComponentEvent(component.getName(),
+          ComponentEventType.CHECK_STABLE));
+    }
+  }
+
+  private static void assignNewContainer(
+      ApplicationAttemptId attemptId, long containerNum,
+      ServiceContext context, Component component) {
+    Container container = org.apache.hadoop.yarn.api.records.Container
+        .newInstance(ContainerId.newContainerId(attemptId, containerNum),
+            NODE_ID, "localhost", null, null,
+            null);
+    component.handle(new ComponentEvent(component.getName(),
+        ComponentEventType.CONTAINER_ALLOCATED)
+        .setContainer(container).setContainerId(container.getId()));
+    ComponentInstance instance = context.scheduler.getLiveInstances().get(
+        container.getId());
+    ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
+        container.getId(), ComponentInstanceEventType.START);
+    instance.handle(startEvent);
+
+    ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
+        container.getId(), ComponentInstanceEventType.BECOME_READY);
+    instance.handle(readyEvent);
+  }
+
+  private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
+
+}
+

+ 88 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.yarn.service.ServiceContext;
+import org.apache.hadoop.yarn.service.ServiceTestUtils;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.ComponentEventType;
+import org.apache.hadoop.yarn.service.component.TestComponent;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ComponentInstance}.
+ */
+public class TestComponentInstance {
+
+  @Rule
+  public ServiceTestUtils.ServiceFSWatcher rule =
+      new ServiceTestUtils.ServiceFSWatcher();
+
+  @Test
+  public void testContainerUpgrade() throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testContainerUpgrade");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    upgradeComponent(component);
+
+    ComponentInstance instance = component.getAllComponentInstances()
+        .iterator().next();
+    ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+    instance.handle(instanceEvent);
+    Container containerSpec = component.getComponentSpec().getContainer(
+        instance.getContainer().getId().toString());
+    Assert.assertEquals("instance not upgrading",
+        ContainerState.UPGRADING, containerSpec.getState());
+  }
+
+  @Test
+  public void testContainerReadyAfterUpgrade() throws Exception {
+    ServiceContext context = TestComponent.createTestContext(rule,
+        "testContainerStarted");
+    Component component = context.scheduler.getAllComponents().entrySet()
+        .iterator().next().getValue();
+    upgradeComponent(component);
+
+    ComponentInstance instance = component.getAllComponentInstances()
+        .iterator().next();
+
+    ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
+        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
+    instance.handle(instanceEvent);
+
+    instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
+        ComponentInstanceEventType.BECOME_READY));
+    Assert.assertEquals("instance not ready",
+        ContainerState.READY, instance.getCompSpec().getContainer(
+            instance.getContainer().getId().toString()).getState());
+  }
+
+  private void upgradeComponent(Component component) {
+    component.handle(new ComponentEvent(component.getName(),
+        ComponentEventType.UPGRADE)
+        .setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2"));
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java

@@ -81,6 +81,7 @@ public class TestServiceMonitor extends ServiceTestUtils {
   public void testComponentDependency() throws Exception{
     ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
     Service exampleApp = new Service();
+    exampleApp.setVersion("v1");
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testComponentDependency");
     exampleApp.addComponent(createComponent("compa", 1, "sleep 1000"));

+ 20 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -231,18 +232,30 @@ public abstract class AppAdminClient extends CompositeService {
       IOException, YarnException;
 
   /**
-   * Upgrade a long running service.
-   *
-   * @param appName the name of the application
-   * @param fileName specification of application upgrade to save.
+   * Initiate upgrade of a long running service.
    *
+   * @param appName      the name of the application.
+   * @param fileName     specification of application upgrade to save.
+   * @param autoFinalize when true, finalization of upgrade will be done
+   *                     automatically.
    * @return exit code
-   * @throws IOException IOException
+   * @throws IOException   IOException
    * @throws YarnException exception in client or server
    */
   @Public
   @Unstable
-  public abstract int actionUpgrade(String appName, String fileName)
-      throws IOException, YarnException;
+  public abstract int initiateUpgrade(String appName, String fileName,
+      boolean autoFinalize) throws IOException, YarnException;
+
+  /**
+   * Upgrade component instances of a long running service.
+   *
+   * @param appName            the name of the application.
+   * @param componentInstances the name of the component instances.
+   */
+  @Public
+  @Unstable
+  public abstract int actionUpgradeInstances(String appName,
+      List<String> componentInstances) throws IOException, YarnException;
 
 }

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java

@@ -99,6 +99,11 @@ public class ApplicationCLI extends YarnCLI {
   public static final String FLEX_CMD = "flex";
   public static final String COMPONENT = "component";
   public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch";
+  public static final String UPGRADE_CMD = "upgrade";
+  public static final String UPGRADE_INITIATE = "initiate";
+  public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
+  public static final String UPGRADE_FINALIZE = "finalize";
+  public static final String COMPONENT_INSTS = "instances";
 
   private static String firstArg = null;
 
@@ -236,6 +241,20 @@ public class ApplicationCLI extends YarnCLI {
           "to HDFS to make future launches faster. Supports -appTypes option " +
           "to specify which client implementation to use. Optionally a " +
           "destination folder for the tarball can be specified.");
+      opts.addOption(UPGRADE_CMD, true, "Upgrades an application/long-" +
+          "running service. It requires either -initiate, -instances, or " +
+          "-finalize options.");
+      opts.addOption(UPGRADE_INITIATE, true, "Works with -upgrade option to " +
+          "initiate the application upgrade. It requires the upgraded " +
+          "application specification file.");
+      opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " +
+          "trigger the upgrade of specified component instances of the " +
+          "application.");
+      opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " +
+          "finalize the upgrade.");
+      opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
+          "-initiate options to initiate the upgrade of the application with " +
+          "the ability to finalize the upgrade automatically.");
       opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
       opts.getOption(LAUNCH_CMD).setArgs(2);
       opts.getOption(START_CMD).setArgName("Application Name");
@@ -248,6 +267,13 @@ public class ApplicationCLI extends YarnCLI {
       opts.getOption(COMPONENT).setArgs(2);
       opts.getOption(ENABLE_FAST_LAUNCH).setOptionalArg(true);
       opts.getOption(ENABLE_FAST_LAUNCH).setArgName("Destination Folder");
+      opts.getOption(UPGRADE_CMD).setArgName("Application Name");
+      opts.getOption(UPGRADE_CMD).setArgs(1);
+      opts.getOption(UPGRADE_INITIATE).setArgName("File Name");
+      opts.getOption(UPGRADE_INITIATE).setArgs(1);
+      opts.getOption(COMPONENT_INSTS).setArgName("Component Instances");
+      opts.getOption(COMPONENT_INSTS).setValueSeparator(',');
+      opts.getOption(COMPONENT_INSTS).setArgs(Option.UNLIMITED_VALUES);
     } else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
       opts.addOption(STATUS_CMD, true,
           "Prints the status of the application attempt.");
@@ -546,6 +572,45 @@ public class ApplicationCLI extends YarnCLI {
       }
       moveApplicationAcrossQueues(cliParser.getOptionValue(APP_ID),
           cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE));
+    } else if (cliParser.hasOption(UPGRADE_CMD)) {
+      if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE,
+          UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS,
+          APP_TYPE_CMD)) {
+        printUsage(title, opts);
+        return exitCode;
+      }
+      String appType = getSingleAppTypeFromCLI(cliParser);
+      AppAdminClient client =  AppAdminClient.createAppAdminClient(appType,
+          getConf());
+      String appName = cliParser.getOptionValue(UPGRADE_CMD);
+      if (cliParser.hasOption(UPGRADE_INITIATE)) {
+        if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
+            UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, APP_TYPE_CMD)) {
+          printUsage(title, opts);
+          return exitCode;
+        }
+        String fileName = cliParser.getOptionValue(UPGRADE_INITIATE);
+        if (cliParser.hasOption(UPGRADE_AUTO_FINALIZE)) {
+          return client.initiateUpgrade(appName, fileName, true);
+        } else {
+          return client.initiateUpgrade(appName, fileName, false);
+        }
+      } else if (cliParser.hasOption(COMPONENT_INSTS)) {
+        if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
+            COMPONENT_INSTS, APP_TYPE_CMD)) {
+          printUsage(title, opts);
+          return exitCode;
+        }
+        String[] instances = cliParser.getOptionValues(COMPONENT_INSTS);
+        return client.actionUpgradeInstances(appName, Arrays.asList(instances));
+      } else if (cliParser.hasOption(UPGRADE_FINALIZE)) {
+        if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
+            UPGRADE_FINALIZE, APP_TYPE_CMD)) {
+          printUsage(title, opts);
+          return exitCode;
+        }
+        return client.actionStart(appName);
+      }
     } else {
       syserr.println("Invalid Command Usage : ");
       printUsage(title, opts);

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

@@ -2124,6 +2124,11 @@ public class TestYarnCLI {
     pw.println("                                          applications based on input");
     pw.println("                                          comma-separated list of");
     pw.println("                                          application types.");
+    pw.println(" -autoFinalize                            Works with -upgrade and");
+    pw.println("                                          -initiate options to initiate");
+    pw.println("                                          the upgrade of the application");
+    pw.println("                                          with the ability to finalize the");
+    pw.println("                                          upgrade automatically.");
     pw.println(" -changeQueue <Queue Name>                Moves application to a new");
     pw.println("                                          queue. ApplicationId can be");
     pw.println("                                          passed using 'appId' option.");
@@ -2152,6 +2157,8 @@ public class TestYarnCLI {
     pw.println("                                          Optionally a destination folder");
     pw.println("                                          for the tarball can be");
     pw.println("                                          specified.");
+    pw.println(" -finalize                                Works with -upgrade option to");
+    pw.println("                                          finalize the upgrade.");
     pw.println(" -flex <Application Name or ID>           Changes number of running");
     pw.println("                                          containers for a component of an");
     pw.println("                                          application / long-running");
@@ -2165,6 +2172,15 @@ public class TestYarnCLI {
     pw.println("                                          which client implementation to");
     pw.println("                                          use.");
     pw.println(" -help                                    Displays help for all commands.");
+    pw.println(" -initiate <File Name>                    Works with -upgrade option to");
+    pw.println("                                          initiate the application");
+    pw.println("                                          upgrade. It requires the");
+    pw.println("                                          upgraded application");
+    pw.println("                                          specification file.");
+    pw.println(" -instances <Component Instances>         Works with -upgrade option to");
+    pw.println("                                          trigger the upgrade of specified");
+    pw.println("                                          component instances of the");
+    pw.println("                                          application.");
     pw.println(" -kill <Application ID>                   Kills the application. Set of");
     pw.println("                                          applications can be provided");
     pw.println("                                          separated with space");
@@ -2232,6 +2248,11 @@ public class TestYarnCLI {
     pw.println(" -updatePriority <Priority>               update priority of an");
     pw.println("                                          application. ApplicationId can");
     pw.println("                                          be passed using 'appId' option.");
+    pw.println(" -upgrade <Application Name>              Upgrades an");
+    pw.println("                                          application/long-running");
+    pw.println("                                          service. It requires either");
+    pw.println("                                          -initiate, -instances, or");
+    pw.println("                                          -finalize options.");
     pw.close();
     String appsHelpStr = baos.toString("UTF-8");
     return appsHelpStr;