Browse Source

YARN-5327. API changes required to support recurring reservations in the YARN ReservationSystem. (Sangeetha Abdu Jyothi via Subru).

Subru Krishnan 8 years ago
parent
commit
b930dc3ec0

+ 48 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ReservationDefinition.java

@@ -37,13 +37,24 @@ public abstract class ReservationDefinition {
   @Public
   @Unstable
   public static ReservationDefinition newInstance(long arrival, long deadline,
-      ReservationRequests reservationRequests, String name) {
+      ReservationRequests reservationRequests, String name,
+      String recurrenceExpression) {
     ReservationDefinition rDefinition =
         Records.newRecord(ReservationDefinition.class);
     rDefinition.setArrival(arrival);
     rDefinition.setDeadline(deadline);
     rDefinition.setReservationRequests(reservationRequests);
     rDefinition.setReservationName(name);
+    rDefinition.setRecurrenceExpression(recurrenceExpression);
+    return rDefinition;
+  }
+
+  @Public
+  @Unstable
+  public static ReservationDefinition newInstance(long arrival, long deadline,
+      ReservationRequests reservationRequests, String name) {
+    ReservationDefinition rDefinition =
+        newInstance(arrival, deadline, reservationRequests, name, "0");
     return rDefinition;
   }
 
@@ -134,4 +145,40 @@ public abstract class ReservationDefinition {
   @Evolving
   public abstract void setReservationName(String name);
 
+  /**
+   * Get the recurrence of this reservation representing the time period of
+   * the periodic job. Currently, only long values are supported. Later,
+   * support for regular expressions denoting arbitrary recurrence patterns
+   * (e.g., every Tuesday and Thursday) will be added.
+   * Recurrence is represented in milliseconds for periodic jobs.
+   * Recurrence is 0 for non-periodic jobs. Periodic jobs are valid until they
+   * are explicitly cancelled and have higher priority than non-periodic jobs
+   * (during initial placement and replanning). Periodic job allocations are
+   * consistent across runs (flexibility in allocation is leveraged only during
+   * initial placement, allocations remain consistent thereafter).
+   *
+   * @return recurrence of this reservation
+   */
+  @Public
+  @Evolving
+  public abstract String getRecurrenceExpression();
+
+  /**
+   * Set the recurrence of this reservation representing the time period of
+   * the periodic job. Currently, only long values are supported. Later,
+   * support for regular expressions denoting arbitrary recurrence patterns
+   * (e.g., every Tuesday and Thursday) will be added.
+   * Recurrence is represented in milliseconds for periodic jobs.
+   * Recurrence is 0 for non-periodic jobs. Periodic jobs are valid until they
+   * are explicitly cancelled and have higher priority than non-periodic jobs
+   * (during initial placement and replanning). Periodic job allocations are
+   * consistent across runs (flexibility in allocation is leveraged only during
+   * initial placement, allocations remain consistent thereafter).
+   *
+   * @param recurrenceExpression recurrence interval of this reservation
+   */
+  @Public
+  @Evolving
+  public abstract void setRecurrenceExpression(String recurrenceExpression);
+
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -477,6 +477,7 @@ message ReservationDefinitionProto {
   optional int64 arrival = 2;
   optional int64 deadline = 3;
   optional string reservation_name = 4;
+  optional string recurrence_expression = 5 [default = "0"];
 }
 
 message ResourceAllocationRequestProto {

+ 19 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ReservationDefinitionPBImpl.java

@@ -162,8 +162,9 @@ public class ReservationDefinitionPBImpl extends ReservationDefinition {
   @Override
   public String toString() {
     return "{Arrival: " + getArrival() + ", Deadline: " + getDeadline()
-        + ", Reservation Name: " + getReservationName() + ", Resources: "
-        + getReservationRequests() + "}";
+        + ", Reservation Name: " + getReservationName()
+        + ", Recurrence expression: " + getRecurrenceExpression()
+        + ", Resources: " + getReservationRequests() + "}";
   }
 
   @Override
@@ -181,4 +182,20 @@ public class ReservationDefinitionPBImpl extends ReservationDefinition {
     return false;
   }
 
+  @Override
+  public String getRecurrenceExpression() {
+    ReservationDefinitionProtoOrBuilder p = viaProto ? proto : builder;
+    if (p.hasRecurrenceExpression()) {
+      String recurrenceExpression = p.getRecurrenceExpression();
+      return recurrenceExpression;
+    } else {
+      return "0";
+    }
+  }
+
+  @Override
+  public void setRecurrenceExpression(String recurrenceExpression) {
+    builder.setRecurrenceExpression(recurrenceExpression);
+  }
+
 }

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java

@@ -155,6 +155,20 @@ public class ReservationInputValidator {
           "validate reservation input definition", "ClientRMService", message);
       throw RPCUtil.getRemoteException(message);
     }
+    // check that the recurrence is a positive long value.
+    String recurrenceExpression = contract.getRecurrenceExpression();
+    try {
+      Long recurrence = Long.parseLong(recurrenceExpression);
+      if (recurrence < 0) {
+        message = "Negative Period : " + recurrenceExpression + ". Please try"
+            + " again with a non-negative long value as period";
+        throw RPCUtil.getRemoteException(message);
+      }
+    } catch (NumberFormatException e) {
+      message = "Invalid period " + recurrenceExpression + ". Please try"
+          + " again with a non-negative long value as period";
+      throw RPCUtil.getRemoteException(message);
+    }
   }
 
   private Plan getPlanFromQueue(ReservationSystem reservationSystem, String

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java

@@ -269,6 +269,59 @@ public class TestReservationInputValidator {
     }
   }
 
+  @Test
+  public void testSubmitReservationValidRecurrenceExpression() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3, "600000");
+    plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testSubmitReservationNegativeRecurrenceExpression() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3, "-1234");
+    plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("Negative Period : "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidRecurrenceExpression() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3, "123abc");
+    plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("Invalid period "));
+      LOG.info(message);
+    }
+  }
+
   @Test
   public void testUpdateReservationNormal() {
     ReservationUpdateRequest request =
@@ -625,12 +678,20 @@ public class TestReservationInputValidator {
   private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
       int numRequests, int numContainers, long arrival, long deadline,
       long duration) {
+    return createSimpleReservationSubmissionRequest(numRequests, numContainers,
+        arrival, deadline, duration, "0");
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration, String recurrence) {
     // create a request with a single atomic ask
     ReservationSubmissionRequest request =
         new ReservationSubmissionRequestPBImpl();
     ReservationDefinition rDef = new ReservationDefinitionPBImpl();
     rDef.setArrival(arrival);
     rDef.setDeadline(deadline);
+    rDef.setRecurrenceExpression(recurrence);
     if (numRequests > 0) {
       ReservationRequests reqs = new ReservationRequestsPBImpl();
       rDef.setReservationRequests(reqs);