瀏覽代碼

YARN-8901. Fixed restart policy NEVER/ON_FAILURE with component dependency.
Contributed by Suma Shivaprasad

(cherry picked from commit f5a95f7998e110cab81e52acd99b07e13ea9653d)

Eric Yang 6 年之前
父節點
當前提交
8c332affa8

+ 7 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java

@@ -59,11 +59,14 @@ public final class NeverRestartPolicy implements ComponentRestartPolicy {
     return false;
   }
 
-  @Override public boolean isReadyForDownStream(Component component) {
-    if (hasCompleted(component)) {
-      return true;
+  @Override public boolean isReadyForDownStream(Component dependentComponent) {
+    if (dependentComponent.getNumReadyInstances()
+        + dependentComponent.getNumSucceededInstances()
+        + dependentComponent.getNumFailedInstances()
+        < dependentComponent.getNumDesiredInstances()) {
+      return false;
     }
-    return false;
+    return true;
   }
 
   @Override public boolean allowUpgrades() {

+ 7 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java

@@ -65,12 +65,14 @@ public final class OnFailureRestartPolicy implements ComponentRestartPolicy {
     return false;
   }
 
-  @Override public boolean isReadyForDownStream(Component component) {
-    if (hasCompletedSuccessfully(component)) {
-      return true;
+  @Override public boolean isReadyForDownStream(Component dependentComponent) {
+    if (dependentComponent.getNumReadyInstances()
+        + dependentComponent.getNumSucceededInstances()
+        + dependentComponent.getNumFailedInstances()
+        < dependentComponent.getNumDesiredInstances()) {
+      return false;
     }
-
-    return false;
+    return true;
   }
 
   @Override public boolean allowUpgrades() {

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java

@@ -65,6 +65,7 @@ public class TestComponentRestartPolicy {
     when(component.getNumSucceededInstances()).thenReturn(new Long(1));
     when(component.getNumFailedInstances()).thenReturn(new Long(2));
     when(component.getNumDesiredInstances()).thenReturn(3);
+    when(component.getNumReadyInstances()).thenReturn(3);
 
     ComponentInstance instance = mock(ComponentInstance.class);
     when(instance.getComponent()).thenReturn(component);
@@ -92,6 +93,7 @@ public class TestComponentRestartPolicy {
     when(component.getNumSucceededInstances()).thenReturn(new Long(3));
     when(component.getNumFailedInstances()).thenReturn(new Long(0));
     when(component.getNumDesiredInstances()).thenReturn(3);
+    when(component.getNumReadyInstances()).thenReturn(3);
 
     ComponentInstance instance = mock(ComponentInstance.class);
     when(instance.getComponent()).thenReturn(component);
@@ -123,7 +125,7 @@ public class TestComponentRestartPolicy {
     assertEquals(true,
         restartPolicy.shouldRelaunchInstance(instance, containerStatus));
 
-    assertEquals(false, restartPolicy.isReadyForDownStream(component));
+    assertEquals(true, restartPolicy.isReadyForDownStream(component));
 
   }
 }

+ 12 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java

@@ -85,11 +85,15 @@ public class TestServiceMonitor extends ServiceTestUtils {
     exampleApp.setId(applicationId.toString());
     exampleApp.setName("testComponentDependency");
     exampleApp.addComponent(createComponent("compa", 1, "sleep 1000"));
-    Component compb = createComponent("compb", 1, "sleep 1000");
-
     // Let compb depends on compa;
-    compb.setDependencies(Collections.singletonList("compa"));
+    Component compb = createComponent("compb", 1, "sleep 1000", Component
+        .RestartPolicyEnum.ON_FAILURE, Collections.singletonList("compa"));
+    // Let compb depends on compb;
+    Component compc = createComponent("compc", 1, "sleep 1000", Component
+        .RestartPolicyEnum.NEVER, Collections.singletonList("compb"));
+
     exampleApp.addComponent(compb);
+    exampleApp.addComponent(compc);
 
     MockServiceAM am = new MockServiceAM(exampleApp);
     am.init(conf);
@@ -105,6 +109,11 @@ public class TestServiceMonitor extends ServiceTestUtils {
     // waiting for compb's dependencies are satisfied
     am.waitForDependenciesSatisfied("compb");
 
+    // feed 1 container to compb,
+    am.feedContainerToComp(exampleApp, 2, "compb");
+    // waiting for compc's dependencies are satisfied
+    am.waitForDependenciesSatisfied("compc");
+
     // feed 1 container to compb
     am.feedContainerToComp(exampleApp, 2, "compb");
     am.flexComponent("compa", 2);