Browse Source

YARN-4164. Changed updateApplicationPriority API to return the updated application priority. Contributed by Rohith Sharma K S

Jian He 9 years ago
parent
commit
42d1b38989

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java

@@ -490,9 +490,9 @@ public class ResourceMgrDelegate extends YarnClient {
   }
 
   @Override
-  public void updateApplicationPriority(ApplicationId applicationId,
+  public Priority updateApplicationPriority(ApplicationId applicationId,
       Priority priority) throws YarnException, IOException {
-    client.updateApplicationPriority(applicationId, priority);
+    return client.updateApplicationPriority(applicationId, priority);
   }
 
   @Override

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationPriorityResponse.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.protocolrecords;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -39,9 +40,24 @@ import org.apache.hadoop.yarn.util.Records;
 @Unstable
 public abstract class UpdateApplicationPriorityResponse {
 
-  public static UpdateApplicationPriorityResponse newInstance() {
+  public static UpdateApplicationPriorityResponse newInstance(
+      Priority priority) {
     UpdateApplicationPriorityResponse response =
         Records.newRecord(UpdateApplicationPriorityResponse.class);
+    response.setApplicationPriority(priority);
     return response;
   }
+
+  /**
+   * Get the <code>Priority</code> of the application to be set.
+   * @return Updated <code>Priority</code> of the application.
+   */
+  public abstract Priority getApplicationPriority();
+
+  /**
+   * Set the <code>Priority</code> of the application.
+   *
+   * @param priority <code>Priority</code> of the application
+   */
+  public abstract void setApplicationPriority(Priority priority);
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -256,6 +256,7 @@ message UpdateApplicationPriorityRequestProto {
 }
 
 message UpdateApplicationPriorityResponseProto {
+   optional PriorityProto applicationPriority = 1;
 }
 
 message SignalContainerRequestProto {

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

@@ -770,12 +770,14 @@ public abstract class YarnClient extends AbstractService {
    * </p>
    * @param applicationId
    * @param priority
+   * @return updated priority of an application.
    * @throws YarnException
    * @throws IOException
    */
   @Public
   @Unstable
-  public abstract void updateApplicationPriority(ApplicationId applicationId,
+  public abstract Priority updateApplicationPriority(
+      ApplicationId applicationId,
       Priority priority) throws YarnException, IOException;
 
   /**

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -878,11 +878,11 @@ public class YarnClientImpl extends YarnClient {
   }
 
   @Override
-  public void updateApplicationPriority(ApplicationId applicationId,
+  public Priority updateApplicationPriority(ApplicationId applicationId,
       Priority priority) throws YarnException, IOException {
     UpdateApplicationPriorityRequest request =
         UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
-    rmClient.updateApplicationPriority(request);
+    return rmClient.updateApplicationPriority(request).getApplicationPriority();
   }
 
   @Override

+ 13 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java

@@ -746,10 +746,19 @@ public class ApplicationCLI extends YarnCLI {
       throws YarnException, IOException {
     ApplicationId appId = ApplicationId.fromString(applicationId);
     Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
-    sysout.println("Updating priority of an application " + applicationId);
-    client.updateApplicationPriority(appId, newAppPriority);
-    sysout.println("Successfully updated the application with id "
-        + applicationId + " with priority '" + priority + "'");
+    sysout.println("Updating priority of an aplication " + applicationId);
+    Priority updateApplicationPriority =
+        client.updateApplicationPriority(appId, newAppPriority);
+    if (newAppPriority.equals(updateApplicationPriority)) {
+      sysout.println("Successfully updated the application "
+          + applicationId + " with priority '" + priority + "'");
+    } else {
+      sysout
+          .println("Updated priority of an application  "
+              + applicationId
+          + " to cluster max priority OR keeping old priority"
+          + " as application is in final states");
+    }
   }
 
   @SuppressWarnings("unchecked")

+ 63 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationPriorityResponsePBImpl.java

@@ -19,7 +19,11 @@
 package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
 
 import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProtoOrBuilder;
 
 import com.google.protobuf.TextFormat;
 
@@ -31,6 +35,8 @@ public class UpdateApplicationPriorityResponsePBImpl extends
   UpdateApplicationPriorityResponseProto.Builder builder = null;
   boolean viaProto = false;
 
+  private Priority updatedAppPriority = null;
+
   public UpdateApplicationPriorityResponsePBImpl() {
     builder = UpdateApplicationPriorityResponseProto.newBuilder();
   }
@@ -42,11 +48,68 @@ public class UpdateApplicationPriorityResponsePBImpl extends
   }
 
   public UpdateApplicationPriorityResponseProto getProto() {
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
   }
 
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = UpdateApplicationPriorityResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.updatedAppPriority != null) {
+      builder
+          .setApplicationPriority(
+              convertToProtoFormat(this.updatedAppPriority));
+    }
+  }
+
+  @Override
+  public Priority getApplicationPriority() {
+    UpdateApplicationPriorityResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (this.updatedAppPriority != null) {
+      return this.updatedAppPriority;
+    }
+    if (!p.hasApplicationPriority()) {
+      return null;
+    }
+    this.updatedAppPriority =
+        convertFromProtoFormat(p.getApplicationPriority());
+    return this.updatedAppPriority;
+  }
+
+  @Override
+  public void setApplicationPriority(Priority priority) {
+    maybeInitBuilder();
+    if (priority == null) {
+      builder.clearApplicationPriority();
+    }
+    this.updatedAppPriority = priority;
+  }
+
+  private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
+    return new PriorityPBImpl(p);
+  }
+
+  private PriorityProto convertToProtoFormat(Priority t) {
+    return ((PriorityPBImpl) t).getProto();
+  }
+
   @Override
   public int hashCode() {
     return getProto().hashCode();

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -1606,6 +1606,8 @@ public class ClientRMService extends AbstractService implements
         RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
             AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
             applicationId);
+        response.setApplicationPriority(application
+            .getApplicationSubmissionContext().getPriority());
         return response;
       }
       String msg = "Application in " + application.getState()
@@ -1629,6 +1631,8 @@ public class ClientRMService extends AbstractService implements
 
     RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
         AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
+    response.setApplicationPriority(application
+        .getApplicationSubmissionContext().getPriority());
     return response;
   }
 
@@ -1637,6 +1641,7 @@ public class ClientRMService extends AbstractService implements
    * After the request passes some sanity check, it will be delivered
    * to RMNodeImpl so that the next NM heartbeat will pick up the signal request
    */
+  @SuppressWarnings("unchecked")
   @Override
   public SignalContainerResponse signalToContainer(
       SignalContainerRequest request) throws YarnException, IOException {

+ 40 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -1661,49 +1662,37 @@ public class TestClientRMService {
         appPriority, app1.getApplicationSubmissionContext().getPriority()
             .getPriority());
 
-    appPriority = 9;
+    appPriority = 11;
     ClientRMService rmService = rm.getClientRMService();
-    UpdateApplicationPriorityRequest updateRequest =
-        UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
-            Priority.newInstance(appPriority));
-
-    rmService.updateApplicationPriority(updateRequest);
+    testAplicationPriorityUpdation(rmService, app1, appPriority, maxPriority);
 
-    Assert.assertEquals("Incorrect priority has been set to application",
-        appPriority, app1.getApplicationSubmissionContext().getPriority()
-            .getPriority());
+    appPriority = 9;
+    testAplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
 
     rm.killApp(app1.getApplicationId());
     rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
 
-    appPriority = 8;
-    UpdateApplicationPriorityRequest updateRequestNew =
-        UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
-            Priority.newInstance(appPriority));
-    // Update priority request for application in KILLED state
-    rmService.updateApplicationPriority(updateRequestNew);
-
-    // Hence new priority should not be updated
-    Assert.assertNotEquals("Priority should not be updated as app is in KILLED state",
-        appPriority, app1.getApplicationSubmissionContext().getPriority()
-            .getPriority());
-    Assert.assertEquals("Priority should be same as old one before update",
-        9, app1.getApplicationSubmissionContext().getPriority()
-            .getPriority());
 
     // Update priority request for invalid application id.
     ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
-    updateRequest =
+    UpdateApplicationPriorityRequest updateRequest =
         UpdateApplicationPriorityRequest.newInstance(invalidAppId,
             Priority.newInstance(appPriority));
     try {
       rmService.updateApplicationPriority(updateRequest);
-      Assert
-          .fail("ApplicationNotFoundException should be thrown for invalid application id");
+      Assert.fail("ApplicationNotFoundException should be thrown "
+          + "for invalid application id");
     } catch (ApplicationNotFoundException e) {
       // Expected
     }
 
+    updateRequest =
+        UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
+            Priority.newInstance(11));
+    Assert.assertEquals("Incorrect priority has been set to application",
+        appPriority, rmService.updateApplicationPriority(updateRequest)
+            .getApplicationPriority().getPriority());
+
     rm.stop();
   }
 
@@ -1723,23 +1712,23 @@ public class TestClientRMService {
     String excludeFile = "excludeFile";
     createExcludeFile(excludeFile);
     YarnConfiguration conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
-        excludeFile);
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeFile);
     MockRM rm = new MockRM(conf) {
       protected ClientRMService createClientRMService() {
-        return new ClientRMService(this.rmContext, scheduler,
-            this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
+        return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
+            this.applicationACLsManager, this.queueACLsManager,
             this.getRMContext().getRMDelegationTokenSecretManager());
-      };
+      }
+
+      ;
     };
     rm.start();
 
     YarnRPC rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-            .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
+        .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
 
     // Make call
     GetClusterNodesRequest request =
@@ -1751,4 +1740,21 @@ public class TestClientRMService {
     rpc.stopProxy(client, conf);
     new File(excludeFile).delete();
   }
+
+  private void testAplicationPriorityUpdation(ClientRMService rmService,
+      RMApp app1, int tobeUpdatedPriority, int expected) throws YarnException,
+      IOException {
+    UpdateApplicationPriorityRequest updateRequest =
+        UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
+            Priority.newInstance(tobeUpdatedPriority));
+
+    UpdateApplicationPriorityResponse updateApplicationPriority =
+        rmService.updateApplicationPriority(updateRequest);
+
+    Assert.assertEquals("Incorrect priority has been set to application",
+        expected, app1.getApplicationSubmissionContext().getPriority()
+            .getPriority());
+    Assert.assertEquals("Incorrect priority has been returned", expected,
+        updateApplicationPriority.getApplicationPriority().getPriority());
+  }
 }