Prechádzať zdrojové kódy

YARN-948. Changed ResourceManager to validate the release container list before actually releasing them. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1508609 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1508611 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 12 rokov pred
rodič
commit
2c6f8de3b8

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -12,6 +12,9 @@ Release 2.1.1-beta - UNRELEASED
 
   BUG FIXES
 
+    YARN-948. Changed ResourceManager to validate the release container list
+    before actually releasing them. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.1.0-beta - 2013-08-06
 
   INCOMPATIBLE CHANGES

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidContainerReleaseException.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+
+/**
+ * This exception is thrown when an Application Master tries to release
+ * containers not belonging to it using
+ * {@link ApplicationMasterProtocol#allocate(AllocateRequest)} API.
+ */
+public class InvalidContainerReleaseException extends YarnException {
+
+  private static final long serialVersionUID = 13498237L;
+  
+  public InvalidContainerReleaseException(Throwable cause) {
+    super(cause);
+  }
+  
+  public InvalidContainerReleaseException(String message) {
+    super(message);
+  }
+  
+  public InvalidContainerReleaseException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+}

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -386,7 +387,7 @@ public class ApplicationMasterService extends AbstractService implements
       
       // sanity check
       try {
-        SchedulerUtils.validateResourceRequests(ask,
+        RMServerUtils.validateResourceRequests(ask,
             rScheduler.getMaximumResourceCapability());
       } catch (InvalidResourceRequestException e) {
         LOG.warn("Invalid resource ask by application " + appAttemptId, e);
@@ -394,12 +395,19 @@ public class ApplicationMasterService extends AbstractService implements
       }
       
       try {
-        SchedulerUtils.validateBlacklistRequest(blacklistRequest);
+        RMServerUtils.validateBlacklistRequest(blacklistRequest);
       }  catch (InvalidResourceBlacklistRequestException e) {
         LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
         throw e;
       }
       
+      try {
+        RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
+      } catch (InvalidContainerReleaseException e) {
+        LOG.warn("Invalid container release by application " + appAttemptId, e);
+        throw e;
+      }
+      
       // Send new requests to appAttempt.
       Allocation allocation =
           this.rScheduler.allocate(appAttemptId, ask, release, 

+ 57 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java

@@ -22,8 +22,17 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 
 /**
  * Utility methods to aid serving RM data through the REST and RPC APIs
@@ -55,4 +64,52 @@ public class RMServerUtils {
     }
     return results;
   }
+  
+  /**
+   * Utility method to validate a list resource requests, by insuring that the
+   * requested memory/vcore is non-negative and not greater than max
+   */
+  public static void validateResourceRequests(List<ResourceRequest> ask,
+      Resource maximumResource) throws InvalidResourceRequestException {
+    for (ResourceRequest resReq : ask) {
+      SchedulerUtils.validateResourceRequest(resReq, maximumResource);
+    }
+  }
+
+  /*
+   * @throw <code>InvalidResourceBlacklistRequestException </code> if the
+   * resource is not able to be added to the blacklist.
+   */
+  public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRequest) 
+  throws InvalidResourceBlacklistRequestException {
+    if (blacklistRequest != null) {
+      List<String> plus = blacklistRequest.getBlacklistAdditions();
+      if (plus != null && plus.contains(ResourceRequest.ANY)) {
+        throw new InvalidResourceBlacklistRequestException(
+            "Cannot add " + ResourceRequest.ANY + " to the blacklist!");
+      }
+    }
+  }
+
+  /**
+   * It will validate to make sure all the containers belong to correct
+   * application attempt id. If not then it will throw
+   * {@link InvalidContainerReleaseException}
+   * @param containerReleaseList containers to be released as requested by
+   * application master.
+   * @param appAttemptId Application attempt Id
+   * @throws InvalidContainerReleaseException 
+   */
+  public static void
+      validateContainerReleaseRequest(List<ContainerId> containerReleaseList,
+          ApplicationAttemptId appAttemptId)
+          throws InvalidContainerReleaseException {
+    for (ContainerId cId : containerReleaseList) {
+      if (!appAttemptId.equals(cId.getApplicationAttemptId())) {
+        throw new InvalidContainerReleaseException("Cannot release container : "
+            + cId.toString() + " not belonging to this application attempt : "
+            + appAttemptId);
+      }
+    }
+  }
 }

+ 1 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java

@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -182,30 +183,4 @@ public class SchedulerUtils {
           + ", maxVirtualCores=" + maximumResource.getVirtualCores());
     }
   }
-
-  /**
-   * Utility method to validate a list resource requests, by insuring that the
-   * requested memory/vcore is non-negative and not greater than max
-   */
-  public static void validateResourceRequests(List<ResourceRequest> ask,
-      Resource maximumResource) throws InvalidResourceRequestException {
-    for (ResourceRequest resReq : ask) {
-      validateResourceRequest(resReq, maximumResource);
-    }
-  }
-
-  /*
-   * @throw <code>InvalidResourceBlacklistRequestException </code> if the
-   * resource is not able to be added to the blacklist.
-   */
-  public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRequest) 
-  throws InvalidResourceBlacklistRequestException {
-    if (blacklistRequest != null) {
-      List<String> plus = blacklistRequest.getBlacklistAdditions();
-      if (plus != null && plus.contains(ResourceRequest.ANY)) {
-        throw new InvalidResourceBlacklistRequestException(
-            "Cannot add " + ResourceRequest.ANY + " to the blacklist!");
-      }
-    }
-  }
 }

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java

@@ -130,6 +130,9 @@ public class MockAM {
     return response;
   }
 
+  public void addContainerToBeReleased(ContainerId containerId) {
+    releases.add(containerId);
+  }
   public AllocateResponse allocate(
       String host, int memory, int numContainers,
       List<ContainerId> releases) throws Exception {

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java

@@ -24,7 +24,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -88,4 +91,63 @@ public class TestApplicationMasterService {
     Assert.assertEquals(MockRM.clusterTimeStamp, tokenId.getRMIdentifer());
     rm.stop();
   }
+  
+  @Test(timeout=600000)
+  public void testInvalidContainerReleaseRequest() throws Exception {
+    MockRM rm = new MockRM(conf);
+    
+    try {
+      rm.start();
+
+      // Register node1
+      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+      // Submit an application
+      RMApp app1 = rm.submitApp(1024);
+
+      // kick the scheduling
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+      am1.registerAppAttempt();
+      
+      am1.addRequests(new String[] { "127.0.0.1" }, GB, 1, 1);
+      AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+      // kick the scheduler
+      nm1.nodeHeartbeat(true);
+      while (alloc1Response.getAllocatedContainers().size() < 1) {
+        LOG.info("Waiting for containers to be created for app 1...");
+        Thread.sleep(1000);
+        alloc1Response = am1.schedule();
+      }
+      
+      Assert.assertTrue(alloc1Response.getAllocatedContainers().size() > 0);
+      
+      RMApp app2 = rm.submitApp(1024);
+      
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
+      MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
+      am2.registerAppAttempt();
+      
+      // Now trying to release container allocated for app1 -> appAttempt1.
+      ContainerId cId = alloc1Response.getAllocatedContainers().get(0).getId();
+      am2.addContainerToBeReleased(cId);
+      try {
+        am2.schedule();
+        Assert.fail("Exception was expected!!");
+      } catch (InvalidContainerReleaseException e) {
+        StringBuilder sb = new StringBuilder("Cannot release container : ");
+        sb.append(cId.toString());
+        sb.append(" not belonging to this application attempt : ");
+        sb.append(attempt2.getAppAttemptId().toString());
+        Assert.assertTrue(e.getMessage().contains(sb.toString()));
+      }
+    } finally {
+      if (rm != null) {
+        rm.stop();
+      }
+    }
+  }
 }