Selaa lähdekoodia

YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.

subru 10 vuotta sitten
vanhempi
commit
7b5b71f789

+ 3 - 0
YARN-1051-CHANGES.txt

@@ -20,3 +20,6 @@ on user reservations. (Carlo Curino and Subru Krishnan via curino)
 
 YARN-1712. Plan follower that synchronizes the current state of reservation
 subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)
+
+YARN-2080. Integrating reservation system with ResourceManager and 
+client-RM protocol. (Subru Krishnan and Carlo Curino  via subru)

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java

@@ -949,5 +949,4 @@ public class TestYarnClient {
             ReservationSystemTestUtil.reservationQ);
     return request;
   }
-
 }

+ 0 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -110,10 +110,7 @@ public interface RMContext {
   
   long getEpoch();
 
-  ReservationSystem getReservationSystem();
-
   boolean isSchedulerReadyForAllocatingContainers();
 
   ReservationSystem getReservationSystem();
-
 }

+ 12 - 70
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -79,18 +79,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-
-import com.google.common.annotations.VisibleForTesting;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class RMAppImpl implements RMApp, Recoverable {
@@ -119,12 +113,6 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final String applicationType;
   private final Set<String> applicationTags;
 
-  private final long attemptFailuresValidityInterval;
-
-  private Clock systemClock;
-
-  private boolean isNumAttemptsBeyondThreshold = false;
-
   // Mutable fields
   private long startTime;
   private long finishTime = 0;
@@ -343,8 +331,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       ApplicationMasterService masterService, long submitTime,
       String applicationType, Set<String> applicationTags) {
 
-    this.systemClock = new SystemClock();
-
     this.applicationId = applicationId;
     this.name = name;
     this.rmContext = rmContext;
@@ -357,7 +343,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.submitTime = submitTime;
-    this.startTime = this.systemClock.getTime();
+    this.startTime = System.currentTimeMillis();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
 
@@ -375,9 +361,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       this.maxAppAttempts = individualMaxAppAttempts;
     }
 
-    this.attemptFailuresValidityInterval =
-        submissionContext.getAttemptFailuresValidityInterval();
-
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -385,7 +368,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.stateMachine = stateMachineFactory.make(this);
 
     rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
-    rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
   }
 
   @Override
@@ -547,7 +529,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       float progress = 0.0f;
       org.apache.hadoop.yarn.api.records.Token amrmToken = null;
       if (allowAccess) {
-        trackingUrl = getDefaultProxyTrackingUrl();
         if (this.currentAttempt != null) {
           currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
           trackingUrl = this.currentAttempt.getTrackingUrl();
@@ -608,20 +589,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
-  private String getDefaultProxyTrackingUrl() {
-    try {
-      final String scheme = WebAppUtils.getHttpSchemePrefix(conf);
-      String proxy = WebAppUtils.getProxyHostAndPort(conf);
-      URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy);
-      URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId);
-      return result.toASCIIString();
-    } catch (URISyntaxException e) {
-      LOG.warn("Could not generate default proxy tracking URL for "
-          + applicationId);
-      return UNAVAILABLE;
-    }
-  }
-
   @Override
   public long getFinishTime() {
     this.readLock.lock();
@@ -663,20 +630,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
-  @Override
-  public String getOriginalTrackingUrl() {
-    this.readLock.lock();
-    
-    try {
-      if (this.currentAttempt != null) {
-        return this.currentAttempt.getOriginalTrackingUrl();
-      }
-      return null;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
   @Override
   public StringBuilder getDiagnostics() {
     this.readLock.lock();
@@ -935,7 +888,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       msg = "Unmanaged application " + this.getApplicationId()
               + " failed due to " + failedEvent.getDiagnostics()
               + ". Failing the application.";
-    } else if (this.isNumAttemptsBeyondThreshold) {
+    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
       msg = "Application " + this.getApplicationId() + " failed "
               + this.maxAppAttempts + " times due to "
               + failedEvent.getDiagnostics() + ". Failing the application.";
@@ -968,7 +921,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       RMAppState stateToBeStored) {
     rememberTargetTransitions(event, transitionToDo, targetFinalState);
     this.stateBeforeFinalSaving = getState();
-    this.storedFinishTime = this.systemClock.getTime();
+    this.storedFinishTime = System.currentTimeMillis();
 
     LOG.info("Updating application " + this.applicationId
         + " with final state: " + this.targetedFinalState);
@@ -1135,7 +1088,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
       app.finishTime = app.storedFinishTime;
       if (app.finishTime == 0 ) {
-        app.finishTime = app.systemClock.getTime();
+        app.finishTime = System.currentTimeMillis();
       }
       // Recovered apps that are completed were not added to scheduler, so no
       // need to remove them from scheduler.
@@ -1149,23 +1102,16 @@ public class RMAppImpl implements RMApp, Recoverable {
 
       app.rmContext.getRMApplicationHistoryWriter()
           .applicationFinished(app, finalState);
-      app.rmContext.getSystemMetricsPublisher()
-          .appFinished(app, finalState, app.finishTime);
     };
   }
 
   private int getNumFailedAppAttempts() {
     int completedAttempts = 0;
-    long endTime = this.systemClock.getTime();
     // Do not count AM preemption, hardware failures or NM resync
     // as attempt failure.
     for (RMAppAttempt attempt : attempts.values()) {
       if (attempt.shouldCountTowardsMaxAttemptRetry()) {
-        if (this.attemptFailuresValidityInterval <= 0
-            || (attempt.getFinishTime() > endTime
-                - this.attemptFailuresValidityInterval)) {
-          completedAttempts++;
-        }
+        completedAttempts++;
       }
     }
     return completedAttempts;
@@ -1182,10 +1128,9 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-      int numberOfFailure = app.getNumFailedAppAttempts();
       if (!app.submissionContext.getUnmanagedAM()
-          && numberOfFailure < app.maxAppAttempts) {
-        boolean transferStateFromPreviousAttempt;
+          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+        boolean transferStateFromPreviousAttempt = false;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
             failedEvent.getTransferStateFromPreviousAttempt();
@@ -1195,16 +1140,13 @@ public class RMAppImpl implements RMApp, Recoverable {
         // Transfer the state from the previous attempt to the current attempt.
         // Note that the previous failed attempt may still be collecting the
         // container events from the scheduler and update its data structures
-        // before the new attempt is created. We always transferState for
-        // finished containers so that they can be acked to NM,
-        // but when pulling finished container we will check this flag again.
-        ((RMAppAttemptImpl) app.currentAttempt)
-          .transferStateFromPreviousAttempt(oldAttempt);
+        // before the new attempt is created.
+        if (transferStateFromPreviousAttempt) {
+          ((RMAppAttemptImpl) app.currentAttempt)
+            .transferStateFromPreviousAttempt(oldAttempt);
+        }
         return initialState;
       } else {
-        if (numberOfFailure >= app.maxAppAttempts) {
-          app.isNumAttemptsBeyondThreshold = true;
-        }
         app.rememberTargetTransitionsAndStoreState(event,
           new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
           RMAppState.FAILED);