Преглед изворни кода

YARN-8758. Support getting PreemptionMessage when using AMRMClientAsyn. (Zian Chen via wangda)

Change-Id: Ibf5d165f49957b582eeadeb41dc285c84d2f05e7
(cherry picked from commit 6926fd0ec634df2576bbc9f45e9636b99260db72)
(cherry picked from commit abe4a8e5d82e70ce991e2830f20eba9f25a2491a)
Wangda Tan пре 6 година
родитељ
комит
30f50a0682

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -553,6 +554,16 @@ extends AbstractService {
     public void onRequestsRejected(
         List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
     }
+
+    /**
+     * Called when the RM responds to a heartbeat with preemption message
+     * @param preemptionMessage
+     */
+    @Public
+    @Unstable
+    public void onPreemptionMessageReceived(
+        PreemptionMessage preemptionMessage) {
+    }
   }
 
   /**

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -401,6 +402,14 @@ extends AMRMClientAsync<T> {
             handler.onContainersAllocated(allocated);
           }
 
+          PreemptionMessage preemptionMessage = response.getPreemptionMessage();
+          if (preemptionMessage != null) {
+            if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
+              ((AMRMClientAsync.AbstractCallbackHandler) handler)
+                  .onPreemptionMessageReceived(preemptionMessage);
+            }
+          }
+
           if (!response.getContainersFromPreviousAttempts().isEmpty()) {
             if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
               ((AMRMClientAsync.AbstractCallbackHandler) handler)