ソースを参照

Rebasing with trunk to create YARN-1051 merge patch

subru 10 年 前
コミット
1c7d1df1de

+ 70 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -79,12 +79,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class RMAppImpl implements RMApp, Recoverable {
@@ -113,6 +119,12 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final String applicationType;
   private final Set<String> applicationTags;
 
+  private final long attemptFailuresValidityInterval;
+
+  private Clock systemClock;
+
+  private boolean isNumAttemptsBeyondThreshold = false;
+
   // Mutable fields
   private long startTime;
   private long finishTime = 0;
@@ -331,6 +343,8 @@ public class RMAppImpl implements RMApp, Recoverable {
       ApplicationMasterService masterService, long submitTime,
       String applicationType, Set<String> applicationTags) {
 
+    this.systemClock = new SystemClock();
+
     this.applicationId = applicationId;
     this.name = name;
     this.rmContext = rmContext;
@@ -343,7 +357,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.submitTime = submitTime;
-    this.startTime = System.currentTimeMillis();
+    this.startTime = this.systemClock.getTime();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
 
@@ -361,6 +375,9 @@ public class RMAppImpl implements RMApp, Recoverable {
       this.maxAppAttempts = individualMaxAppAttempts;
     }
 
+    this.attemptFailuresValidityInterval =
+        submissionContext.getAttemptFailuresValidityInterval();
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -368,6 +385,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.stateMachine = stateMachineFactory.make(this);
 
     rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
+    rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
   }
 
   @Override
@@ -529,6 +547,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       float progress = 0.0f;
       org.apache.hadoop.yarn.api.records.Token amrmToken = null;
       if (allowAccess) {
+        trackingUrl = getDefaultProxyTrackingUrl();
         if (this.currentAttempt != null) {
           currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
           trackingUrl = this.currentAttempt.getTrackingUrl();
@@ -589,6 +608,20 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  private String getDefaultProxyTrackingUrl() {
+    try {
+      final String scheme = WebAppUtils.getHttpSchemePrefix(conf);
+      String proxy = WebAppUtils.getProxyHostAndPort(conf);
+      URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy);
+      URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId);
+      return result.toASCIIString();
+    } catch (URISyntaxException e) {
+      LOG.warn("Could not generate default proxy tracking URL for "
+          + applicationId);
+      return UNAVAILABLE;
+    }
+  }
+
   @Override
   public long getFinishTime() {
     this.readLock.lock();
@@ -630,6 +663,20 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  @Override
+  public String getOriginalTrackingUrl() {
+    this.readLock.lock();
+    
+    try {
+      if (this.currentAttempt != null) {
+        return this.currentAttempt.getOriginalTrackingUrl();
+      }
+      return null;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Override
   public StringBuilder getDiagnostics() {
     this.readLock.lock();
@@ -888,7 +935,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       msg = "Unmanaged application " + this.getApplicationId()
               + " failed due to " + failedEvent.getDiagnostics()
               + ". Failing the application.";
-    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
+    } else if (this.isNumAttemptsBeyondThreshold) {
       msg = "Application " + this.getApplicationId() + " failed "
               + this.maxAppAttempts + " times due to "
               + failedEvent.getDiagnostics() + ". Failing the application.";
@@ -921,7 +968,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       RMAppState stateToBeStored) {
     rememberTargetTransitions(event, transitionToDo, targetFinalState);
     this.stateBeforeFinalSaving = getState();
-    this.storedFinishTime = System.currentTimeMillis();
+    this.storedFinishTime = this.systemClock.getTime();
 
     LOG.info("Updating application " + this.applicationId
         + " with final state: " + this.targetedFinalState);
@@ -1088,7 +1135,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
       app.finishTime = app.storedFinishTime;
       if (app.finishTime == 0 ) {
-        app.finishTime = System.currentTimeMillis();
+        app.finishTime = app.systemClock.getTime();
       }
       // Recovered apps that are completed were not added to scheduler, so no
       // need to remove them from scheduler.
@@ -1102,16 +1149,23 @@ public class RMAppImpl implements RMApp, Recoverable {
 
       app.rmContext.getRMApplicationHistoryWriter()
           .applicationFinished(app, finalState);
+      app.rmContext.getSystemMetricsPublisher()
+          .appFinished(app, finalState, app.finishTime);
     };
   }
 
   private int getNumFailedAppAttempts() {
     int completedAttempts = 0;
+    long endTime = this.systemClock.getTime();
     // Do not count AM preemption, hardware failures or NM resync
     // as attempt failure.
     for (RMAppAttempt attempt : attempts.values()) {
       if (attempt.shouldCountTowardsMaxAttemptRetry()) {
-        completedAttempts++;
+        if (this.attemptFailuresValidityInterval <= 0
+            || (attempt.getFinishTime() > endTime
+                - this.attemptFailuresValidityInterval)) {
+          completedAttempts++;
+        }
       }
     }
     return completedAttempts;
@@ -1128,9 +1182,10 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
+      int numberOfFailure = app.getNumFailedAppAttempts();
       if (!app.submissionContext.getUnmanagedAM()
-          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
-        boolean transferStateFromPreviousAttempt = false;
+          && numberOfFailure < app.maxAppAttempts) {
+        boolean transferStateFromPreviousAttempt;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
             failedEvent.getTransferStateFromPreviousAttempt();
@@ -1140,13 +1195,16 @@ public class RMAppImpl implements RMApp, Recoverable {
         // Transfer the state from the previous attempt to the current attempt.
         // Note that the previous failed attempt may still be collecting the
         // container events from the scheduler and update its data structures
-        // before the new attempt is created.
-        if (transferStateFromPreviousAttempt) {
-          ((RMAppAttemptImpl) app.currentAttempt)
-            .transferStateFromPreviousAttempt(oldAttempt);
-        }
+        // before the new attempt is created. We always transferState for
+        // finished containers so that they can be acked to NM,
+        // but when pulling finished container we will check this flag again.
+        ((RMAppAttemptImpl) app.currentAttempt)
+          .transferStateFromPreviousAttempt(oldAttempt);
         return initialState;
       } else {
+        if (numberOfFailure >= app.maxAppAttempts) {
+          app.isNumAttemptsBeyondThreshold = true;
+        }
         app.rememberTargetTransitionsAndStoreState(event,
           new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
           RMAppState.FAILED);