瀏覽代碼

YARN-8411. Restart stopped system service during RM start.
Contributed by Billie Rinaldi

(cherry picked from commit 69b05968974994c6e22d6562a67b9392d1700094)

Eric Yang 7 年之前
父節點
當前提交
aba67c360c

+ 25 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java

@@ -29,7 +29,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.service.SystemServiceManager;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,12 +230,31 @@ public class SystemServiceManagerImpl extends AbstractService
           userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
             @Override public ApplicationId run()
                 throws IOException, YarnException {
-              ApplicationId applicationId = serviceClient.actionCreate(service);
-              return applicationId;
+              boolean tryStart = true;
+              try {
+                serviceClient.actionBuild(service);
+              } catch (Exception e) {
+                if (e instanceof SliderException && ((SliderException) e)
+                    .getExitCode() == SliderExitCodes.EXIT_INSTANCE_EXISTS) {
+                  LOG.info("Service {} already exists, will attempt to start " +
+                      "service", service.getName());
+                } else {
+                  tryStart = false;
+                  LOG.info("Got exception saving {}, will not attempt to " +
+                      "start service", service.getName(), e);
+                }
+              }
+              if (tryStart) {
+                return serviceClient.actionStartAndGetId(service.getName());
+              } else {
+                return null;
+              }
             }
           });
-      LOG.info("Service {} submitted with Application ID: {}",
-          service.getName(), applicationId);
+      if (applicationId != null) {
+        LOG.info("Service {} submitted with Application ID: {}",
+            service.getName(), applicationId);
+      }
     }
   }
 

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java

@@ -648,8 +648,7 @@ public class ApiServer {
             ServiceClient sc = getServiceClient();
             sc.init(YARN_CONFIG);
             sc.start();
-            sc.actionStart(appName);
-            ApplicationId appId = sc.getAppId(appName);
+            ApplicationId appId = sc.actionStartAndGetId(appName);
             sc.close();
             return appId;
           }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/ServiceClientTest.java

@@ -103,13 +103,13 @@ public class ServiceClientTest extends ServiceClient {
   }
 
   @Override
-  public int actionStart(String serviceName)
+  public ApplicationId actionStartAndGetId(String serviceName)
       throws YarnException, IOException {
     if (serviceName != null && serviceName.equals("jenkins")) {
       ApplicationId appId =
           ApplicationId.newInstance(System.currentTimeMillis(), 1);
       serviceAppId.put(serviceName, appId);
-      return EXIT_SUCCESS;
+      return appId;
     } else {
       throw new ApplicationNotFoundException("");
     }

+ 34 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceManagerImpl.java

@@ -22,7 +22,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,6 +53,7 @@ public class TestSystemServiceManagerImpl {
 
   private String[] users = new String[] {"user1", "user2"};
   private static Map<String, Set<String>> loadedServices = new HashMap<>();
+  private static Map<String, Set<String>> savedServices = new HashMap<>();
   private static Map<String, Set<String>> submittedServices = new HashMap<>();
 
   @Before
@@ -72,7 +75,7 @@ public class TestSystemServiceManagerImpl {
   }
 
   @After
-  public void teadDown() {
+  public void tearDown() {
     systemService.stop();
   }
 
@@ -102,6 +105,11 @@ public class TestSystemServiceManagerImpl {
     // 2nd time launch service to handle if service exist scenario
     systemService.launchUserService(userServices);
     verifyForLaunchedUserServices();
+
+    // verify start of stopped services
+    submittedServices.clear();
+    systemService.launchUserService(userServices);
+    verifyForLaunchedUserServices();
   }
 
   private void verifyForScannedUserServices(
@@ -149,21 +157,41 @@ public class TestSystemServiceManagerImpl {
     }
 
     @Override
-    public ApplicationId actionCreate(Service service)
+    public int actionBuild(Service service)
         throws YarnException, IOException {
       String userName =
           UserGroupInformation.getCurrentUser().getShortUserName();
-      Set<String> services = submittedServices.get(userName);
+      Set<String> services = savedServices.get(userName);
       if (services == null) {
         services = new HashSet<>();
-        submittedServices.put(userName, services);
+        savedServices.put(userName, services);
       }
       if (services.contains(service.getName())) {
-        String message = "Failed to create service " + service.getName()
+        String message = "Failed to save service " + service.getName()
             + ", because it already exists.";
-        throw new YarnException(message);
+        throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+            message);
       }
       services.add(service.getName());
+      return 0;
+    }
+
+    @Override
+    public ApplicationId actionStartAndGetId(String serviceName)
+        throws YarnException, IOException {
+      String userName =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      Set<String> services = submittedServices.get(userName);
+      if (services == null) {
+        services = new HashSet<>();
+        submittedServices.put(userName, services);
+      }
+      if (services.contains(serviceName)) {
+        String message = "Failed to create service " + serviceName
+            + ", because it is already running.";
+        throw new YarnException(message);
+      }
+      services.add(serviceName);
       return ApplicationId.newInstance(System.currentTimeMillis(), 1);
     }
   }

+ 10 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java

@@ -1003,6 +1003,12 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
 
   @Override
   public int actionStart(String serviceName) throws YarnException, IOException {
+    actionStartAndGetId(serviceName);
+    return EXIT_SUCCESS;
+  }
+
+  public ApplicationId actionStartAndGetId(String serviceName) throws
+      YarnException, IOException {
     ServiceApiUtil.validateNameFormat(serviceName, getConfig());
     Service liveService = getStatus(serviceName);
     if (liveService == null ||
@@ -1019,11 +1025,11 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       // write app definition on to hdfs
       Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service);
       LOG.info("Persisted service " + service.getName() + " at " + appJson);
-      return 0;
+      return appId;
     } else {
       LOG.info("Finalize service {} upgrade");
-      ApplicationReport appReport =
-          yarnClient.getApplicationReport(getAppId(serviceName));
+      ApplicationId appId = getAppId(serviceName);
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
       if (StringUtils.isEmpty(appReport.getHost())) {
         throw new YarnException(serviceName + " AM hostname is empty");
       }
@@ -1032,7 +1038,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       RestartServiceRequestProto.Builder requestBuilder =
           RestartServiceRequestProto.newBuilder();
       proxy.restart(requestBuilder.build());
-      return 0;
+      return appId;
     }
   }