瀏覽代碼

Fix some invalid transitions in the RM. Contributed by Vinod KV

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1139443 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 14 年之前
父節點
當前提交
4b5705378d
共有 20 個文件被更改,包括 327 次插入736 次删除
  1. 2 1
      mapreduce/.eclipse.templates/.classpath
  2. 2 0
      mapreduce/CHANGES.txt
  3. 0 2
      mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java
  4. 0 11
      mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java
  5. 0 1
      mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto
  6. 0 1
      mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java
  7. 2 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  8. 3 3
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java
  9. 44 285
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
  10. 140 94
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
  11. 7 86
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
  12. 1 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
  13. 7 5
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
  14. 10 8
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
  15. 11 84
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
  16. 27 89
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
  17. 17 17
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
  18. 20 15
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java
  19. 17 16
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java
  20. 17 18
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java

+ 2 - 1
mapreduce/.eclipse.templates/.classpath

@@ -57,11 +57,12 @@
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/slf4j-api-1.5.8.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/slf4j-simple-1.5.8.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/xmlenc-0.52.jar"/>
-        <classpathentry kind="lib" path="build/ivy/lib/Hadoop/test/mockito-all-1.8.0.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/test/mockito-all-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/index/common/lucene-core-2.3.1.jar"/>
 	<classpathentry kind="lib" path="build/test/classes"/>
 	<classpathentry kind="lib" path="build/classes"/>
 	<classpathentry kind="lib" path="conf"/>
 	<classpathentry kind="lib" path="build/ivy/lib/sqoop/common/commons-io-1.4.jar"/>
+	<classpathentry kind="var" path="M2_REPO"/>
 	<classpathentry kind="output" path="build/eclipse-classes"/>
 </classpath>

+ 2 - 0
mapreduce/CHANGES.txt

@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+
+    Fix some invalid transitions in the RM. (vinodkv via ddas)
     
     Fix diagnostics display for more than 100 apps in RM. (llu)
 

+ 0 - 2
mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java

@@ -22,10 +22,8 @@ public interface ApplicationStatus {
   int getResponseId();
   ApplicationId getApplicationId();
   float getProgress();
-  long getLastSeen();
   
   void setResponseId(int id);
   void setApplicationId(ApplicationId applicationID);
   void setProgress(float progress);
-  void setLastSeen(long lastSeen);
 }

+ 0 - 11
mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java

@@ -104,17 +104,6 @@ public class ApplicationStatusPBImpl extends ProtoBase<ApplicationStatusProto> i
     maybeInitBuilder();
     builder.setProgress((progress));
   }
-  @Override
-  public long getLastSeen() {
-    ApplicationStatusProtoOrBuilder p = viaProto ? proto : builder;
-    return (p.getLastSeen());
-  }
-
-  @Override
-  public void setLastSeen(long lastSeen) {
-    maybeInitBuilder();
-    builder.setLastSeen((lastSeen));
-  }
 
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);

+ 0 - 1
mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto

@@ -66,7 +66,6 @@ message ApplicationStatusProto {
   optional int32 response_id = 1;
   optional ApplicationIdProto application_id = 2;
   optional float progress = 3;
-  optional int64 last_seen = 4;
 }
 
 message ApplicationMasterProto {

+ 0 - 1
mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java

@@ -155,7 +155,6 @@ public class MockApps {
   public static ApplicationStatus newAppStatus() {
     ApplicationStatus status = Records.newRecord(ApplicationStatus.class);
     status.setProgress((float)Math.random());
-    status.setLastSeen(System.currentTimeMillis());
     return status;
   }
 

+ 2 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -117,6 +117,7 @@ AMRMProtocol, EventHandler<ASMEvent<ApplicationTrackerEventType>> {
   
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
+    // TODO: What if duplicate register due to lost RPCs
     ApplicationMaster applicationMaster = request.getApplicationMaster();
     try {
       applicationsManager.registerApplicationMaster(applicationMaster);
@@ -137,6 +138,7 @@ AMRMProtocol, EventHandler<ASMEvent<ApplicationTrackerEventType>> {
 
   @Override
   public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
+    // TODO: What if duplicate finish due to lost RPCs
     ApplicationMaster applicationMaster = request.getApplicationMaster();
     try {
       applicationsManager.finishApplicationMaster(applicationMaster);

+ 3 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java

@@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 
@@ -264,7 +263,8 @@ public class AMLauncher implements Runnable {
         LOG.info("Error launching ", ie);
         eventType = ApplicationEventType.LAUNCH_FAILED;
       }
-      handler.handle(new ASMEvent<ApplicationEventType>(eventType,  master));
+      handler.handle(new ApplicationMasterInfoEvent(eventType, master
+          .getApplicationID()));
       break;
     case CLEANUP:
       try {
@@ -273,7 +273,7 @@ public class AMLauncher implements Runnable {
       } catch(IOException ie) {
         LOG.info("Error cleaning master ", ie);
       }
-      handler.handle(new ApplicationFinishEvent(master,
+      handler.handle(new ApplicationFinishEvent(master.getApplicationID(),
           ApplicationState.COMPLETED)); // Doesn't matter what state you send :) :(
       break;
     default:

+ 44 - 285
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java

@@ -20,13 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -34,26 +31,19 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
-import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.CompositeService;
 
 /**
  * This class tracks the application masters that are running. It tracks
@@ -62,15 +52,12 @@ import org.apache.hadoop.yarn.service.AbstractService;
  */
 @Evolving
 @Private
-public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
-<ApplicationEventType>>, Recoverable {
+public class AMTracker extends CompositeService implements Recoverable {
   private static final Log LOG = LogFactory.getLog(AMTracker.class);
   private AMLivelinessMonitor amLivelinessMonitor;
-  private long amExpiryInterval; 
   @SuppressWarnings("rawtypes")
   private EventHandler handler;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  private int amMaxRetries;
 
   private final RMContext rmContext;
 
@@ -78,141 +65,43 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
     new ConcurrentHashMap<ApplicationId, ApplicationMasterInfo>();
 
   private final ApplicationsStore appsStore;
-  
-  private TreeSet<ApplicationStatus> amExpiryQueue =
-    new TreeSet<ApplicationStatus>(
-        new Comparator<ApplicationStatus>() {
-          public int compare(ApplicationStatus p1, ApplicationStatus p2) {
-            if (p1.getLastSeen() < p2.getLastSeen()) {
-              return -1;
-            } else if (p1.getLastSeen() > p2.getLastSeen()) {
-              return 1;
-            } else {
-              return (p1.getApplicationId().getId() -
-                  p2.getApplicationId().getId());
-            }
-          }
-        }
-    );
 
   public AMTracker(RMContext rmContext) {
     super(AMTracker.class.getName());
-    this.amLivelinessMonitor = new AMLivelinessMonitor();
     this.rmContext = rmContext;
     this.appsStore = rmContext.getApplicationsStore();
+    this.handler = rmContext.getDispatcher().getEventHandler();
+    this.amLivelinessMonitor = new AMLivelinessMonitor(this.handler);
+    addService(this.amLivelinessMonitor);
   }
 
   @Override
   public void init(Configuration conf) {
+    this.rmContext.getDispatcher().register(ApplicationEventType.class,
+        new ApplicationEventDispatcher());
     super.init(conf);
-    this.handler = rmContext.getDispatcher().getEventHandler();
-    this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 
-        RMConfig.DEFAULT_AM_EXPIRY_INTERVAL);
-    LOG.info("AM expiry interval: " + this.amExpiryInterval);
-    this.amMaxRetries =  conf.getInt(RMConfig.AM_MAX_RETRIES, 
-        RMConfig.DEFAULT_AM_MAX_RETRIES);
-    LOG.info("AM max retries: " + this.amMaxRetries);
-    this.amLivelinessMonitor.setMonitoringInterval(conf.getLong(
-        RMConfig.AMLIVELINESS_MONITORING_INTERVAL,
-        RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL));
-    this.rmContext.getDispatcher().register(ApplicationEventType.class, this);
-  }
-
-  @Override
-  public void start() {   
-    super.start();
-    amLivelinessMonitor.start();
   }
 
-  /**
-   * This class runs continuosly to track the application masters
-   * that might be dead.
-   */
-  private class AMLivelinessMonitor extends Thread {
-    private volatile boolean stop = false;
-    private long monitoringInterval =
-        RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL;
+  private final class ApplicationEventDispatcher implements
+      EventHandler<ApplicationMasterInfoEvent> {
 
-    public AMLivelinessMonitor() {
-      super("ApplicationsManager:" + AMLivelinessMonitor.class.getName());
-    }
-
-    public void setMonitoringInterval(long interval) {
-      this.monitoringInterval = interval;
+    public ApplicationEventDispatcher() {
     }
 
     @Override
-    public void run() {
-
-      /* the expiry queue does not need to be in sync with applications,
-       * if an applications in the expiry queue cannot be found in applications
-       * its alright. We do not want to hold a lock on applications while going
-       * through the expiry queue.
-       */
-      List<ApplicationId> expired = new ArrayList<ApplicationId>();
-      while (!stop) {
-        ApplicationStatus leastRecent;
-        long now = System.currentTimeMillis();
-        expired.clear();
-        synchronized(amExpiryQueue) {
-          while ((amExpiryQueue.size() > 0) &&
-              (leastRecent = amExpiryQueue.first()) != null &&
-              ((now - leastRecent.getLastSeen()) > 
-              amExpiryInterval)) {
-            amExpiryQueue.remove(leastRecent);
-            ApplicationMasterInfo info;
-            synchronized(applications) {
-              info = applications.get(leastRecent.getApplicationId());
-            }
-            if (info == null) {
-              continue;
-            }
-            ApplicationStatus status = info.getStatus();
-            if ((now - status.getLastSeen()) > amExpiryInterval) {
-              expired.add(status.getApplicationId());
-            } else {
-              amExpiryQueue.add(status);
-            }
-          }
-        }
-        expireAMs(expired);
-        try {
-          Thread.sleep(this.monitoringInterval);
-        } catch (InterruptedException e) {
-          LOG.warn(this.getClass().getName() + " interrupted. Returning.");
-          return;
-        }
-      }
-    }
-
-    public void shutdown() {
-      stop = true;
-    }
-  }
-
-  private void expireAMs(List<ApplicationId> toExpire) {
-    for (ApplicationId app: toExpire) {
-      ApplicationMasterInfo am = null;
+    public void handle(ApplicationMasterInfoEvent event) {
+      ApplicationId appID = event.getApplicationId();
+      ApplicationMasterInfo masterInfo = null;
       synchronized (applications) {
-        am = applications.get(app);
+        masterInfo = applications.get(appID);
+      }
+      try {
+        masterInfo.handle(event);
+      } catch (Throwable t) {
+        LOG.error("Error in handling event type " + event.getType()
+            + " for application " + event.getApplicationId());
       }
-      LOG.info("Expiring the Application " + app);
-      handler.handle(new ASMEvent<ApplicationEventType>
-      (ApplicationEventType.EXPIRE, am));
-    }
-  }
-
-  @Override
-  public void stop() {
-    amLivelinessMonitor.interrupt();
-    amLivelinessMonitor.shutdown();
-    try {
-      amLivelinessMonitor.join();
-    } catch (InterruptedException ie) {
-      LOG.info(amLivelinessMonitor.getName() + " interrupted during join ",
-          ie);
     }
-    super.stop();
   }
 
   public void addMaster(String user,  ApplicationSubmissionContext 
@@ -220,8 +109,9 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
     
     ApplicationStore appStore = appsStore.createApplicationStore(submissionContext.getApplicationId(),
         submissionContext);
-    ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(rmContext, 
-        user, submissionContext, clientToken, appStore);
+    ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(
+        rmContext, getConfig(), user, submissionContext, clientToken,
+        appStore, this.amLivelinessMonitor);
     synchronized(applications) {
       applications.put(applicationMaster.getApplicationID(), applicationMaster);
     }
@@ -229,22 +119,15 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
   }
   
   public void runApplication(ApplicationId applicationId) {
-    ApplicationMasterInfo masterInfo = null;
-    synchronized (applications) {
-      masterInfo = applications.get(applicationId);
-    }
-    rmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
-        ApplicationEventType.ALLOCATE, masterInfo));
-    
+    rmContext.getDispatcher().getSyncHandler().handle(
+        new ApplicationMasterInfoEvent(ApplicationEventType.ALLOCATE,
+            applicationId));
   }
   
   public void finishNonRunnableApplication(ApplicationId applicationId) {
-    ApplicationMasterInfo masterInfo = null;
-    synchronized (applications) {
-      masterInfo = applications.get(applicationId);
-    }
-    rmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
-        ApplicationEventType.FAILED, masterInfo));
+    rmContext.getDispatcher().getSyncHandler().handle(
+        new ApplicationMasterInfoEvent(ApplicationEventType.FAILED,
+            applicationId));
    }
 
   public void finish(ApplicationMaster remoteApplicationMaster) {
@@ -263,7 +146,7 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
         remoteApplicationMaster.getDiagnostics());
 
     rmContext.getDispatcher().getEventHandler().handle(
-        new ApplicationFinishEvent(masterInfo, remoteApplicationMaster
+        new ApplicationFinishEvent(applicationId, remoteApplicationMaster
             .getState()));
   }
 
@@ -280,6 +163,7 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
   public void remove(ApplicationId applicationId) {
     synchronized (applications) {
       //applications.remove(applicationId);
+      this.amLivelinessMonitor.unRegister(applicationId);
     }
   }
 
@@ -291,155 +175,27 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
       }
     }
     return allAMs;
-  } 
-
-  private void addForTracking(AppContext master) {
-    LOG.info("Adding application master for tracking " + master.getMaster());
-    synchronized (amExpiryQueue) {
-      amExpiryQueue.add(master.getStatus());
-    }
   }
 
   public void kill(ApplicationId applicationID) {
-    ApplicationMasterInfo masterInfo = null;
-
-    synchronized(applications) {
-      masterInfo = applications.get(applicationID);
-    }
-    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.KILL, 
-        masterInfo));
-  }
-
-  /*
-   * this class is used for passing status context to the application state
-   * machine.
-   */
-  private static class TrackerAppContext implements AppContext {
-    private final ApplicationId appID;
-    private final ApplicationMaster master;
-    private final UnsupportedOperationException notimplemented;
-
-    public TrackerAppContext(
-        ApplicationId appId, ApplicationMaster master) {
-      this.appID = appId;
-      this.master = master;
-      this.notimplemented = new NotImplementedException();
-    }
-
-    @Override
-    public ApplicationSubmissionContext getSubmissionContext() {
-      throw notimplemented;
-    }
-    @Override
-    public Resource getResource() {
-      throw notimplemented;
-    }
-    @Override
-    public ApplicationId getApplicationID() {
-      return appID;
-    }
-    @Override
-    public ApplicationStatus getStatus() {
-      return master.getStatus();
-    }
-    @Override
-    public ApplicationMaster getMaster() {
-      return master;
-    }
-    @Override
-    public Container getMasterContainer() {
-      throw notimplemented;
-    }
-    @Override
-    public String getUser() {   
-      throw notimplemented;
-    }
-   
-    @Override
-    public String getName() {
-      throw notimplemented;
-    }
-    @Override
-    public String getQueue() {
-      throw notimplemented;
-    }
-
-    @Override
-    public int getFailedCount() {
-      throw notimplemented;
-    }
-
-    @Override
-    public ApplicationStore getStore() {
-     throw notimplemented;
-    }
-
-    @Override
-    public long getStartTime() {
-      throw notimplemented;
-    }
-
-    @Override
-    public long getFinishTime() {
-      throw notimplemented;
-    }
+    handler.handle(new ApplicationMasterInfoEvent(ApplicationEventType.KILL,
+        applicationID));
   }
 
   public void heartBeat(ApplicationStatus status) {
     ApplicationMaster master = recordFactory.newRecordInstance(ApplicationMaster.class);
     master.setStatus(status);
     master.setApplicationId(status.getApplicationId());
-    TrackerAppContext context = new TrackerAppContext(status.getApplicationId(), master);
-    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.STATUSUPDATE, 
-        context));
+    handler.handle(new ApplicationMasterStatusUpdateEvent(status));
   }
 
   public void registerMaster(ApplicationMaster applicationMaster) {
-    applicationMaster.getStatus().setLastSeen(System.currentTimeMillis());
     ApplicationMasterInfo master = null;
     synchronized(applications) {
       master = applications.get(applicationMaster.getApplicationId());
     }
     LOG.info("AM registration " + master.getMaster());
-    TrackerAppContext registrationContext = new TrackerAppContext(
-        master.getApplicationID(), applicationMaster);
-    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-        REGISTERED,  registrationContext));
-  }
-
-  @Override
-  public void handle(ASMEvent<ApplicationEventType> event) {
-    ApplicationId appID = event.getAppContext().getApplicationID();
-    ApplicationMasterInfo masterInfo = null;
-    synchronized(applications) {
-      masterInfo = applications.get(appID);
-    }
-    try {
-      masterInfo.handle(event);
-    } catch(Throwable t) {
-      LOG.error("Error in handling event type " + event.getType() + " for application " 
-          + event.getAppContext().getApplicationID());
-    }
-    /* we need to launch the applicaiton master on allocated transition */
-    if (masterInfo.getState() == ApplicationState.ALLOCATED) {
-      handler.handle(new ASMEvent<ApplicationEventType>(
-          ApplicationEventType.LAUNCH, masterInfo));
-    }
-    if (masterInfo.getState() == ApplicationState.LAUNCHED) {
-      addForTracking(masterInfo);
-    }
-
-    /* check to see if the AM is an EXPIRED_PENDING state and start off the cycle again */
-    if (masterInfo.getState() == ApplicationState.EXPIRED_PENDING) {
-      /* check to see if the number of retries are reached or not */
-      if (masterInfo.getFailedCount() < this.amMaxRetries) {
-        handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATE,
-            masterInfo));
-      } else {
-        handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-            FAILED_MAX_RETRIES, masterInfo));
-      }
-    }
+    handler.handle(new ApplicationMasterRegistrationEvent(applicationMaster));
   }
 
   @Override
@@ -449,11 +205,13 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
       ApplicationInfo appInfo = entry.getValue();
       ApplicationMasterInfo masterInfo = null;
       try {
-        masterInfo = new ApplicationMasterInfo(this.rmContext,
-      
-          appInfo.getApplicationSubmissionContext().getUser(), appInfo.getApplicationSubmissionContext(), 
-          appInfo.getApplicationMaster().getClientToken(), 
-          this.appsStore.createApplicationStore(appId, appInfo.getApplicationSubmissionContext()));
+        masterInfo = new ApplicationMasterInfo(this.rmContext, getConfig(),
+            appInfo.getApplicationSubmissionContext().getUser(), appInfo
+                .getApplicationSubmissionContext(), appInfo
+                .getApplicationMaster().getClientToken(), this.appsStore
+                .createApplicationStore(appId, appInfo
+                    .getApplicationSubmissionContext()),
+            this.amLivelinessMonitor);
       } catch(IOException ie) {
         //ignore
       }
@@ -470,7 +228,8 @@ public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
       master.setStatus(storedAppMaster.getStatus());
       master.setState(storedAppMaster.getState());
       applications.put(appId, masterInfo);
-      handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.RECOVER, masterInfo));
+      handler.handle(new ApplicationMasterInfoEvent(
+          ApplicationEventType.RECOVER, appId));
     }
   }
 }

+ 140 - 94
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java

@@ -22,11 +22,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -37,9 +41,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
@@ -59,7 +63,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
  */
 @Private
 @Unstable
-public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<ApplicationEventType>> {
+public class ApplicationMasterInfo implements AppContext,
+    EventHandler<ApplicationMasterInfoEvent> {
   private static final Log LOG = LogFactory.getLog(ApplicationMasterInfo.class);
   private final ApplicationSubmissionContext submissionContext;
   private ApplicationMaster master;
@@ -74,7 +79,11 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
   private static String DIAGNOSTIC_KILL_APPLICATION = "Application was killed.";
   private static String DIAGNOSTIC_AM_FAILED = "Application Master failed";
   private static String DIAGNOSTIC_AM_LAUNCH_FAILED = "Application Master failed to launch";
-  
+
+  private final int amMaxRetries;
+  private final AMLivelinessMonitor amLivelinessMonitor;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
   private int numFailed = 0;
   private final ApplicationStore appStore;
   
@@ -93,15 +102,13 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
   private final LaunchedTransition launchedTransition = new LaunchedTransition();
   private final FailedLaunchTransition failedLaunchTransition = new FailedLaunchTransition();
   
-  private final StateMachine<ApplicationState, ApplicationEventType, 
-  ASMEvent<ApplicationEventType>> stateMachine;
+  private final StateMachine<ApplicationState,
+                ApplicationEventType, ApplicationMasterInfoEvent> stateMachine;
 
-  private final StateMachineFactory<ApplicationMasterInfo,
-  ApplicationState, ApplicationEventType, ASMEvent<ApplicationEventType>> stateMachineFactory 
-  
-  = new StateMachineFactory
-  <ApplicationMasterInfo, ApplicationState, ApplicationEventType, ASMEvent<ApplicationEventType>>
-  (ApplicationState.PENDING)
+  private final StateMachineFactory<ApplicationMasterInfo, ApplicationState,
+    ApplicationEventType, ApplicationMasterInfoEvent> stateMachineFactory
+          = new StateMachineFactory<ApplicationMasterInfo, ApplicationState,
+    ApplicationEventType, ApplicationMasterInfoEvent>(ApplicationState.PENDING)
 
   // Transitions from PENDING State
   .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
@@ -147,8 +154,6 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
   // Transitions from LAUNCHED State
   .addTransition(ApplicationState.LAUNCHED, ApplicationState.CLEANUP,
       ApplicationEventType.KILL, killTransition)
-  .addTransition(ApplicationState.LAUNCHED, ApplicationState.FAILED,
-      ApplicationEventType.EXPIRE, expireTransition)
    .addTransition(ApplicationState.LAUNCHED, ApplicationState.RUNNING,
       ApplicationEventType.REGISTERED, new RegisterTransition())
   .addTransition(ApplicationState.LAUNCHED, ApplicationState.LAUNCHED,
@@ -189,8 +194,7 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
 
   // Transitions from COMPLETED State
   .addTransition(ApplicationState.COMPLETED, ApplicationState.COMPLETED,
-      EnumSet.of(ApplicationEventType.EXPIRE,
-          ApplicationEventType.FINISH, ApplicationEventType.KILL,
+      EnumSet.of(ApplicationEventType.FINISH, ApplicationEventType.KILL,
           ApplicationEventType.RECOVER))
 
   // Transitions from FAILED State
@@ -209,8 +213,10 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
 
 
 
-  public ApplicationMasterInfo(RMContext context, String user,
-  ApplicationSubmissionContext submissionContext, String clientToken, ApplicationStore appStore) {
+  public ApplicationMasterInfo(RMContext context, Configuration conf,
+      String user, ApplicationSubmissionContext submissionContext,
+      String clientToken, ApplicationStore appStore,
+      AMLivelinessMonitor amLivelinessMonitor) {
     this.user = user;
     this.handler = context.getDispatcher().getEventHandler();
     this.syncHandler = context.getDispatcher().getSyncHandler();
@@ -228,6 +234,14 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
     master.setDiagnostics("");
     this.appStore = appStore;
     this.startTime = System.currentTimeMillis();
+    this.amMaxRetries =  conf.getInt(RMConfig.AM_MAX_RETRIES, 
+        RMConfig.DEFAULT_AM_MAX_RETRIES);
+    LOG.info("AM max retries: " + this.amMaxRetries);
+    this.amLivelinessMonitor = amLivelinessMonitor;
+
+    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
   }
 
   @Override
@@ -307,12 +321,13 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
   }
   
   /* the applicaiton master completed successfully */
-  private static class DoneTransition implements 
-    MultipleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>, ApplicationState> {
+  private static class DoneTransition
+      implements
+      MultipleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent, ApplicationState> {
 
     @Override
     public ApplicationState transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.handler.handle(new ASMEvent<SNEventType>(
         SNEventType.CLEANUP, masterInfo));
       masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(
@@ -320,26 +335,29 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
       masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
       ApplicationTrackerEventType.REMOVE, masterInfo));
       masterInfo.finishTime = System.currentTimeMillis();
+
+      masterInfo.amLivelinessMonitor.unRegister(event.getApplicationId());
+
       ApplicationFinishEvent finishEvent = (ApplicationFinishEvent) event;
       return finishEvent.getFinalApplicationState();
     }
   }
   
-  private static class AllocatingKillTransition implements 
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class AllocatingKillTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(ApplicationTrackerEventType.REMOVE,
           masterInfo));
     }
   }
   
   private static class KillTransition implements
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.finishTime = System.currentTimeMillis();
       masterInfo.getMaster().setDiagnostics(DIAGNOSTIC_KILL_APPLICATION);
       masterInfo.handler.handle(new ASMEvent<SNEventType>(SNEventType.CLEANUP, masterInfo));
@@ -349,12 +367,12 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
     }
   }
 
-  private static class RecoverLaunchTransition implements  SingleArcTransition
-  <ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class RecoverLaunchTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
 
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-        ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
           ApplicationTrackerEventType.ADD, masterInfo));
         
@@ -363,11 +381,11 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
     }
   }
   
-  private static class FailedLaunchTransition implements 
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class FailedLaunchTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.finishTime = System.currentTimeMillis();
       masterInfo.getMaster().setDiagnostics(DIAGNOSTIC_AM_LAUNCH_FAILED);
       masterInfo.handler.handle(new ASMEvent<SNEventType>(
@@ -376,74 +394,82 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
   }
   
   private static class LaunchTransition implements
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(
       AMLauncherEventType.LAUNCH, masterInfo));
     }
   }
   
   private static class RecoverRunningTransition implements
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
           ApplicationTrackerEventType.ADD, masterInfo));
       /* make sure the time stamp is update else expiry thread will expire this */
-      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+      masterInfo.amLivelinessMonitor.receivedPing(event.getApplicationId());
     }
   }
   
   private static class RecoverLaunchedTransition implements
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
           ApplicationTrackerEventType.ADD, masterInfo));
         
-      /* make sure the time stamp is update else expiry thread will expire this */
-      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+      masterInfo.amLivelinessMonitor.register(event.getApplicationId());
     }
   }
 
 
   private static class LaunchedTransition implements
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
-      /* make sure the time stamp is update else expiry thread will expire this */
-      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+        ApplicationMasterInfoEvent event) {
+      masterInfo.amLivelinessMonitor.register(event.getApplicationId());
     }
   }
 
-  private static class  ExpireTransition implements
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class ExpireTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       /* for now this is the same as killed transition but will change later */
       masterInfo.handler.handle(new ASMEvent<SNEventType>(SNEventType.CLEANUP,
         masterInfo));
       masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(
         AMLauncherEventType.CLEANUP, masterInfo));
       masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
-      ApplicationTrackerEventType.EXPIRE, masterInfo));
+          ApplicationTrackerEventType.EXPIRE, masterInfo));
       masterInfo.numFailed++;
+
+      /* check to see if the number of retries are reached or not */
+      if (masterInfo.getFailedCount() < masterInfo.amMaxRetries) {
+        masterInfo.handler.handle(new ApplicationMasterInfoEvent(
+            ApplicationEventType.ALLOCATE, event.getApplicationId()));
+      } else {
+        masterInfo.handler.handle(new ApplicationMasterInfoEvent(
+            ApplicationEventType.FAILED_MAX_RETRIES, masterInfo
+                .getApplicationID()));
+      }
     }
   }
 
 
   /* Transition to schedule again on a container launch failure for AM */
   private static class ScheduleTransition implements 
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       masterInfo.masterContainer = null;
       /* schedule for a slot */
       masterInfo.handler.handle(new ASMEvent<SNEventType>(SNEventType.SCHEDULE,
@@ -452,50 +478,61 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
   }
   
   /* Transition to start the process of allocating for the AM container */
-  private static class AllocateTransition implements 
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class AllocateTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       /* notify tracking applications that an applicaiton has been added */
-      masterInfo.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
+      // TODO: For now, changing to synchHandler. Instead we should use register/deregister.
+      masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
         ApplicationTrackerEventType.ADD, masterInfo));
       
       /* schedule for a slot */
-      masterInfo.handler.handle(new ASMEvent<SNEventType>(SNEventType.SCHEDULE,
-      masterInfo));
+      masterInfo.handler.handle(new ASMEvent<SNEventType>(
+          SNEventType.SCHEDULE, masterInfo));
     }
   }
   
   /* Transition on a container allocated for a container */
-  private static class AllocatedTransition implements SingleArcTransition<ApplicationMasterInfo,
-  ASMEvent<ApplicationEventType>> {
+  private static class AllocatedTransition
+      implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
 
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       /* set the container that was generated by the scheduler negotiator */
-      masterInfo.masterContainer = event.getAppContext().getMasterContainer();
+      ApplicationMasterAllocatedEvent allocatedEvent = 
+         (ApplicationMasterAllocatedEvent) event;
+      masterInfo.masterContainer = allocatedEvent.getMasterContainer();
       try {
         masterInfo.appStore.storeMasterContainer(masterInfo.masterContainer);
       } catch(IOException ie) {
         //TODO ignore for now fix later.
       }
+
+      /* we need to launch the applicaiton master on allocated transition */
+      masterInfo.handler.handle(new ApplicationMasterInfoEvent(
+          ApplicationEventType.LAUNCH, masterInfo.getApplicationID()));
     }    
   }
 
-  private static class RegisterTransition implements  SingleArcTransition<ApplicationMasterInfo,
-  ASMEvent<ApplicationEventType>> {
+  private static class RegisterTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
-      ApplicationMaster registeredMaster = event.getAppContext().getMaster();
+        ApplicationMasterInfoEvent event) {
+      ApplicationMasterRegistrationEvent registrationEvent =
+        (ApplicationMasterRegistrationEvent) event;
+      ApplicationMaster registeredMaster = registrationEvent
+          .getApplicationMaster();
       masterInfo.master.setHost(registeredMaster.getHost());
       masterInfo.master.setTrackingUrl(registeredMaster.getTrackingUrl());
       masterInfo.master.setRpcPort(registeredMaster.getRpcPort());
       masterInfo.master.setStatus(registeredMaster.getStatus());
       masterInfo.master.getStatus().setProgress(0.0f);
-      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+      masterInfo.amLivelinessMonitor.receivedPing(event.getApplicationId());
       try {
         masterInfo.appStore.updateApplicationState(masterInfo.master);
       } catch(IOException ie) {
@@ -506,52 +543,61 @@ public class ApplicationMasterInfo implements AppContext, EventHandler<ASMEvent<
 
   /* transition to finishing state on a cleanup, for now its not used, but will need it 
    * later */
-  private static class FailedTransition implements 
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class FailedTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
 
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
+        ApplicationMasterInfoEvent event) {
       LOG.info("Failed application: " + masterInfo.getApplicationID());
     } 
   }
 
 
   /* Just a status update transition */
-  private static class StatusUpdateTransition implements 
-  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+  private static class StatusUpdateTransition implements
+      SingleArcTransition<ApplicationMasterInfo, ApplicationMasterInfoEvent> {
 
     @Override
     public void transition(ApplicationMasterInfo masterInfo,
-    ASMEvent<ApplicationEventType> event) {
-      masterInfo.master.setStatus(event.getAppContext().getStatus());
-      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+        ApplicationMasterInfoEvent event) {
+      ApplicationMasterStatusUpdateEvent statusUpdateEvent = 
+        (ApplicationMasterStatusUpdateEvent) event;
+      masterInfo.master.setStatus(statusUpdateEvent.getApplicationStatus());
+      masterInfo.amLivelinessMonitor.receivedPing(event.getApplicationId());
     }
   }
 
   @Override
-  public synchronized void handle(ASMEvent<ApplicationEventType> event) {
-    ApplicationId appID =  event.getAppContext().getApplicationID();
-    LOG.info("Processing event for " + appID +  " of type " + event.getType());
-    final ApplicationState oldState = getState();
-    try {
-      /* keep the master in sync with the state machine */
-      stateMachine.doTransition(event.getType(), event);
-      master.setState(stateMachine.getCurrentState());
-      LOG.info("State is " +  stateMachine.getCurrentState());
-    } catch (InvalidStateTransitonException e) {
-      LOG.error("Can't handle this event at current state", e);
-      /* TODO fail the application on the failed transition */
-    }
+  public synchronized void handle(ApplicationMasterInfoEvent event) {
+
+    this.writeLock.lock();
+
     try {
-      appStore.updateApplicationState(master);
-    } catch(IOException ie) {
-      //TODO ignore for now
-    }
-    if (oldState != getState()) {
-      LOG.info(appID + " State change from " 
-      + oldState + " to "
-      + getState());
+      ApplicationId appID = event.getApplicationId();
+      LOG.info("Processing event for " + appID + " of type "
+          + event.getType());
+      final ApplicationState oldState = getState();
+      try {
+        /* keep the master in sync with the state machine */
+        stateMachine.doTransition(event.getType(), event);
+        master.setState(stateMachine.getCurrentState());
+        LOG.info("State is " + stateMachine.getCurrentState());
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        /* TODO fail the application on the failed transition */
+      }
+      try {
+        appStore.updateApplicationState(master);
+      } catch (IOException ie) {
+        // TODO ignore for now
+      }
+      if (oldState != getState()) {
+        LOG.info(appID + " State change from " + oldState + " to "
+            + getState());
+      }
+    } finally {
+      this.writeLock.unlock();
     }
   }
 }

+ 7 - 86
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java

@@ -166,12 +166,10 @@ class SchedulerNegotiator extends AbstractService implements EventHandler<ASMEve
               it.remove();
               Container container = containers.get(0);
               
-              LOG.info("Found container " + container + " for AM of " + 
-                  masterInfo.getMaster());
-              SNAppContext snAppContext = new SNAppContext(masterInfo.getApplicationID(),
-                container);
-              handler.handle(new ASMEvent<ApplicationEventType>(
-                ApplicationEventType.ALLOCATED, snAppContext));
+              LOG.info("Found container " + container + " for AM of "
+                  + masterInfo.getMaster());
+              handler.handle(new ApplicationMasterAllocatedEvent(masterInfo
+                  .getApplicationID(), container));
             }
           }
 
@@ -223,8 +221,8 @@ class SchedulerNegotiator extends AbstractService implements EventHandler<ASMEve
         //TODO remove IOException from the scheduler.
         LOG.error("Error while releasing container for AM " + appContext.getApplicationID());
       }
-      handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.RELEASED, 
-          appContext));
+      handler.handle(new ApplicationMasterInfoEvent(
+          ApplicationEventType.RELEASED, appContext.getApplicationID()));
       break;
     case CLEANUP:
       try {
@@ -247,82 +245,5 @@ class SchedulerNegotiator extends AbstractService implements EventHandler<ASMEve
     Container[] containers = new Container[] {masterInfo.getMasterContainer()};
     scheduler.allocate(masterInfo.getMaster().getApplicationId(), 
         EMPTY_ASK, Arrays.asList(containers));
-  }
-  
-  private static class SNAppContext implements AppContext {
-    private final ApplicationId appID;
-    private final Container container;
-    private final UnsupportedOperationException notImplementedException;
-    
-    public SNAppContext(ApplicationId appID, Container container) {
-      this.appID = appID;
-      this.container = container;
-      this.notImplementedException = new UnsupportedOperationException("Not Implemented");
-    }
-    
-    @Override
-    public ApplicationSubmissionContext getSubmissionContext() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public Resource getResource() {
-     throw notImplementedException;
-    }
-
-    @Override
-    public ApplicationId getApplicationID() {
-      return appID;
-    }
-
-    @Override
-    public ApplicationStatus getStatus() {
-      throw notImplementedException;
-    }
-   
-    @Override
-    public ApplicationMaster getMaster() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public Container getMasterContainer() {
-      return container;
-    }
-
-    @Override
-    public String getUser() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public String getName() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public String getQueue() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public int getFailedCount() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public ApplicationStore getStore() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public long getStartTime() {
-      throw notImplementedException;
-    }
-
-    @Override
-    public long getFinishTime() {
-      throw notImplementedException;
-    }
-  }
+  }  
 }

+ 1 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java

@@ -149,6 +149,7 @@ public class Application {
    * Clear any pending requests from this application.
    */
   public synchronized void clearRequests() {
+    priorities.clear();
     requests.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }

+ 7 - 5
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java

@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -60,7 +60,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 /* a test case that tests the launch failure of a AM */
-public class TestAMLaunchFailure extends TestCase {
+public class TestAMLaunchFailure {
   private static final Log LOG = LogFactory.getLog(TestAMLaunchFailure.class);
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   ApplicationsManagerImpl asmImpl;
@@ -162,8 +162,8 @@ public class TestAMLaunchFailure extends TestCase {
               e.printStackTrace();
             }
             context.getDispatcher().getEventHandler().handle(
-                new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
-                    app));
+                new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED,
+                    app.getApplicationID()));
           }
         }
       }
@@ -216,9 +216,11 @@ public class TestAMLaunchFailure extends TestCase {
     ApplicationMaster master = asmImpl.getApplicationMaster(appID);
 
     while (master.getState() != ApplicationState.FAILED) {
+      LOG.info("Waiting for application to go to FAILED state."
+          + " Current state is " + master.getState());
       Thread.sleep(200);
       master = asmImpl.getApplicationMaster(appID);
     }
-    assertTrue(master.getState() == ApplicationState.FAILED);
+    Assert.assertEquals(ApplicationState.FAILED, master.getState());
   }
 }

+ 10 - 8
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java

@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -50,8 +50,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
-public class TestAMRMRPCResponseId extends TestCase {
+public class TestAMRMRPCResponseId {
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   ApplicationMasterService amService = null;
   ApplicationTokenSecretManager appTokenManager = new ApplicationTokenSecretManager();
@@ -145,7 +146,8 @@ public class TestAMRMRPCResponseId extends TestCase {
   public void tearDown() {
     
   }
-  
+
+  @Test
   public void testARRMResponseId() throws Exception {
     ApplicationId applicationID = applicationsManager.getNewApplicationID();
     ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
@@ -163,21 +165,21 @@ public class TestAMRMRPCResponseId extends TestCase {
     AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class);
     allocateRequest.setApplicationStatus(status);
     AMResponse response = amService.allocate(allocateRequest).getAMResponse();
-    assertTrue(response.getResponseId() == 1);
-    assertFalse(response.getReboot());
+    Assert.assertEquals(1, response.getResponseId());
+    Assert.assertFalse(response.getReboot());
     status.setResponseId(response.getResponseId());
     
     allocateRequest.setApplicationStatus(status);
     response = amService.allocate(allocateRequest).getAMResponse();
-    assertTrue(response.getResponseId() == 2);
+    Assert.assertEquals(2, response.getResponseId());
     /* try resending */
     response = amService.allocate(allocateRequest).getAMResponse();
-    assertTrue(response.getResponseId() == 2);
+    Assert.assertEquals(2, response.getResponseId());
     
     /** try sending old **/
     status.setResponseId(0);
     allocateRequest.setApplicationStatus(status);
     response = amService.allocate(allocateRequest).getAMResponse();
-    assertTrue(response.getReboot());
+    Assert.assertTrue(response.getReboot());
   }
 }

+ 11 - 84
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

@@ -7,7 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -15,7 +15,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -39,7 +38,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
@@ -56,7 +54,7 @@ import org.junit.Test;
  * Test to restart the AM on failure.
  *
  */
-public class TestAMRestart extends TestCase {
+public class TestAMRestart {
   private static final Log LOG = LogFactory.getLog(TestAMRestart.class);
   ApplicationsManagerImpl appImpl;
   RMContext asmContext = new ResourceManager.RMContextImpl(new MemStore());
@@ -105,9 +103,9 @@ public class TestAMRestart extends TestCase {
                 } catch (InterruptedException e) {
                 }
               }
-              asmContext.getDispatcher().getEventHandler().handle(new 
-                  ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
-                      new TestAppContext(appID)));
+              asmContext.getDispatcher().getEventHandler().handle(
+                  new ApplicationMasterInfoEvent(
+                      ApplicationEventType.LAUNCHED, appID));
               launchNotify.addAndGet(-1);
             }
           }
@@ -255,78 +253,7 @@ public class TestAMRestart extends TestCase {
       Thread.sleep(500);
       count++;
     }
-    assertTrue(masterInfo.getState() == finalState);
-  }
-  
-  private class TestAppContext implements AppContext {
-    private ApplicationId appID;
-   
-    public TestAppContext(ApplicationId appID) {
-      this.appID = appID;
-    }
-    @Override
-    public ApplicationSubmissionContext getSubmissionContext() {
-      return null;
-    }
-
-    @Override
-    public Resource getResource() {
-      return null;
-    }
-
-    @Override
-    public ApplicationId getApplicationID() {
-      return appID;
-    }
-
-    @Override
-    public ApplicationStatus getStatus() {
-      return null;
-    }
-
-    @Override
-    public ApplicationMaster getMaster() {
-      return null;
-    }
-
-    @Override
-    public Container getMasterContainer() {
-      return null;
-    }
-
-    @Override
-    public String getUser() {
-      return null;
-    }
-
-    @Override
-    public String getName() {
-      return null;
-    }
-
-    @Override
-    public String getQueue() {
-      return null;
-    }
-
-    @Override
-    public int getFailedCount() {
-      return 0;
-    }
-    
-    @Override
-    public ApplicationStore getStore() {
-     return StoreFactory.createVoidAppStore();
-    }
-    @Override
-    public long getStartTime() {
-      return 0;
-    }
-    @Override
-    public long getFinishTime() {
-      return 0;
-    }
-    
+    Assert.assertEquals(finalState, masterInfo.getState());
   }
 
   @Test
@@ -345,11 +272,11 @@ public class TestAMRestart extends TestCase {
         schedulerNotify.wait();
       }
     }
-    assertTrue(launcherCleanupCalled == maxFailures);
-    assertTrue(launcherLaunchCalled == maxFailures);
-    assertTrue(schedulerAddApplication == maxFailures);
-    assertTrue(schedulerRemoveApplication == maxFailures);
-    assertTrue(masterInfo.getFailedCount() == maxFailures);
+    Assert.assertEquals(maxFailures, launcherCleanupCalled);
+    Assert.assertEquals(maxFailures, launcherLaunchCalled);
+    Assert.assertEquals(maxFailures, schedulerAddApplication);
+    Assert.assertEquals(maxFailures, schedulerRemoveApplication);
+    Assert.assertEquals(maxFailures, masterInfo.getFailedCount());
     waitForFailed(masterInfo, ApplicationState.FAILED);
     stop = true;
   }

+ 27 - 89
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java

@@ -21,37 +21,31 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestASMStateMachine extends TestCase {
+public class TestASMStateMachine {
   private static final Log LOG = LogFactory.getLog(TestASMStateMachine.class);
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   RMContext context = new ResourceManager.RMContextImpl(new MemStore());
@@ -63,10 +57,10 @@ public class TestASMStateMachine extends TestCase {
   private boolean removedApplication = false;
   private boolean launchCleanupCalled = false;
   private AtomicInteger waitForState = new AtomicInteger();
-
+  private Configuration conf = new Configuration();
   @Before
   public void setUp() {
-    context.getDispatcher().init(new Configuration());
+    context.getDispatcher().init(conf);
     context.getDispatcher().start();
     handler = context.getDispatcher().getEventHandler();
     new DummyAMLaunchEventHandler();
@@ -95,8 +89,8 @@ public class TestASMStateMachine extends TestCase {
         launchCalled = true;
         appcontext = event.getAppContext();
         context.getDispatcher().getEventHandler().handle(
-            new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED,
-                appcontext));
+            new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED,
+                appcontext.getApplicationID()));
         break;
       case CLEANUP:
         launchCleanupCalled = true;
@@ -123,71 +117,14 @@ public class TestASMStateMachine extends TestCase {
         snAllocateReceived = true;
         appContext = event.getAppContext();
         context.getDispatcher().getEventHandler().handle(
-            new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED,
-                appContext));
+            new ApplicationMasterAllocatedEvent(appContext.getApplicationID(),
+                appContext.getMasterContainer()));
         break;
       }
     }
 
   }
 
-  private static class StatusContext implements AppContext {
-    @Override
-    public ApplicationSubmissionContext getSubmissionContext() {
-      return null;
-    }
-    @Override
-    public Resource getResource() {
-      return null;
-    }
-    @Override
-    public ApplicationId getApplicationID() {
-      return null;
-    }
-    @Override
-    public ApplicationStatus getStatus() {
-      ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
-      status.setLastSeen(-99);
-      return status;
-    }
-    @Override
-    public ApplicationMaster getMaster() {
-      return null;
-    }
-    @Override
-    public Container getMasterContainer() {
-      return null;
-    }
-    @Override
-    public String getUser() {
-      return null;
-    }
-    @Override
-    public String getName() {
-      return null;
-    }
-    @Override
-    public String getQueue() {
-      return null;
-    }
-    @Override
-    public int getFailedCount() {
-      return 0;
-    }
-    @Override
-    public ApplicationStore getStore() {
-      return StoreFactory.createVoidAppStore();
-    }
-    @Override
-    public long getStartTime() {
-      return 0;
-    }
-    @Override
-    public long getFinishTime() {
-      return 0;
-    }
-  }
-
   private class ApplicationTracker implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
     public ApplicationTracker() {
       context.getDispatcher().register(ApplicationTrackerEventType.class, this);
@@ -206,13 +143,14 @@ public class TestASMStateMachine extends TestCase {
     }
   }
 
-  private class MockAppplicationMasterInfo implements EventHandler<ASMEvent<ApplicationEventType>> {
+  private class MockAppplicationMasterInfo implements
+      EventHandler<ApplicationMasterInfoEvent> {
 
     MockAppplicationMasterInfo() {
       context.getDispatcher().register(ApplicationEventType.class, this);
     }
     @Override
-    public void handle(ASMEvent<ApplicationEventType> event) {
+    public void handle(ApplicationMasterInfoEvent event) {
       LOG.info("The event type is " + event.getType());
     }
   }
@@ -224,7 +162,7 @@ public class TestASMStateMachine extends TestCase {
       Thread.sleep(500);
       count++;
     }
-    assertTrue(masterInfo.getState() == finalState);
+    Assert.assertEquals(finalState, masterInfo.getState());
   } 
   
   /* Test the state machine. 
@@ -237,34 +175,34 @@ public class TestASMStateMachine extends TestCase {
     submissioncontext.getApplicationId().setId(1);
     submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
 
-    ApplicationMasterInfo masterInfo 
-    = new ApplicationMasterInfo(context, "dummyuser", submissioncontext, "dummyToken"
-        , StoreFactory.createVoidAppStore());
+    ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(context,
+        conf, "dummyuser", submissioncontext, "dummyToken", StoreFactory
+            .createVoidAppStore(), new AMLivelinessMonitor(context
+            .getDispatcher().getEventHandler()));
 
     context.getDispatcher().register(ApplicationEventType.class, masterInfo);
-    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-        ALLOCATE, masterInfo));
-
-    waitForState(ApplicationState.ALLOCATED, masterInfo);
-    handler.handle(new ASMEvent<ApplicationEventType>(
-        ApplicationEventType.LAUNCH, masterInfo));
+    handler.handle(new ApplicationMasterInfoEvent(
+        ApplicationEventType.ALLOCATE, submissioncontext.getApplicationId()));
 
     waitForState(ApplicationState.LAUNCHED, masterInfo);
     Assert.assertTrue(snAllocateReceived);
     Assert.assertTrue(launchCalled);
     Assert.assertTrue(addedApplication);
-    handler.handle(new ASMEvent<ApplicationEventType>(
-        ApplicationEventType.REGISTERED, masterInfo));
+    handler
+        .handle(new ApplicationMasterRegistrationEvent(masterInfo.getMaster()));
     waitForState(ApplicationState.RUNNING, masterInfo);
     Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
-    handler.handle(new ASMEvent<ApplicationEventType>(
-        ApplicationEventType.STATUSUPDATE, new StatusContext()));
+
+    ApplicationStatus status = recordFactory
+        .newRecordInstance(ApplicationStatus.class);
+    status.setApplicationId(masterInfo.getApplicationID());
+    handler.handle(new ApplicationMasterStatusUpdateEvent(status));
 
     /* check if the state is still RUNNING */
 
     Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState());
 
-    handler.handle(new ApplicationFinishEvent(masterInfo,
+    handler.handle(new ApplicationFinishEvent(masterInfo.getApplicationID(),
         ApplicationState.COMPLETED));
     waitForState(ApplicationState.COMPLETED, masterInfo);
     Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());
@@ -274,8 +212,8 @@ public class TestASMStateMachine extends TestCase {
     Assert.assertTrue(removedApplication);
 
     /* check if expiry doesnt make it failed */
-    handler.handle(
-        new ASMEvent<ApplicationEventType>(ApplicationEventType.EXPIRE, masterInfo));
+    handler.handle(new ApplicationMasterInfoEvent(ApplicationEventType.EXPIRE,
+        masterInfo.getApplicationID()));
     Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState());   
   }
 }

+ 17 - 17
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java

@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -73,7 +73,7 @@ import org.junit.Test;
  *
  */
 @Ignore
-public class TestApplicationCleanup extends TestCase {
+public class TestApplicationCleanup {
   private static final Log LOG = LogFactory.getLog(TestApplicationCleanup.class);
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private AtomicInteger waitForState = new AtomicInteger(0);
@@ -159,9 +159,9 @@ public class TestApplicationCleanup extends TestCase {
           LOG.info("Launcher Launch called");
           launcherLaunchCalled = true;
           appContext = appEvent.getAppContext();
-          context.getDispatcher().getEventHandler().
-          handle(new ASMEvent<ApplicationEventType>(
-              ApplicationEventType.LAUNCHED, appContext));
+          context.getDispatcher().getEventHandler().handle(
+              new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED,
+                  appContext.getApplicationID()));
           break;
         default:
           break;
@@ -186,9 +186,9 @@ public class TestApplicationCleanup extends TestCase {
         case SCHEDULE:
           schedulerScheduleCalled = true;
           acontext = appEvent.getAppContext();
-          context.getDispatcher().getEventHandler().
-          handle(new ASMEvent<ApplicationEventType>(
-              ApplicationEventType.ALLOCATED, acontext));
+          context.getDispatcher().getEventHandler().handle(
+              new ApplicationMasterAllocatedEvent(acontext.getApplicationID(),
+                  acontext.getMasterContainer()));
         default:
           break;
         }
@@ -221,7 +221,7 @@ public class TestApplicationCleanup extends TestCase {
       Thread.sleep(500);
       count++;
     }
-    assertTrue(masterInfo.getState() == finalState);
+    Assert.assertEquals(finalState, masterInfo.getState());
   }
 
 
@@ -277,22 +277,22 @@ public class TestApplicationCleanup extends TestCase {
     LOG.info("Available resource on first node" + firstNode.getAvailableResource());
     LOG.info("Available resource on second node" + secondNode.getAvailableResource());
     /* only allocate the containers to the first node */
-    assertTrue(firstNode.getAvailableResource().getMemory() == 
-      (firstNodeMemory - (2*memoryNeeded)));
+    Assert.assertEquals((firstNodeMemory - (2 * memoryNeeded)), firstNode
+        .getAvailableResource().getMemory());
     ApplicationMasterInfo masterInfo = asm.getApplicationMasterInfo(appID);
     asm.finishApplication(appID, UserGroupInformation.getCurrentUser());
     while (asm.launcherCleanupCalled != true) {
       Thread.sleep(500);
     }
-    assertTrue(asm.launcherCleanupCalled == true);
-    assertTrue(asm.launcherLaunchCalled == true);
-    assertTrue(asm.schedulerCleanupCalled == true);
-    assertTrue(asm.schedulerScheduleCalled == true);
+    Assert.assertTrue(asm.launcherCleanupCalled);
+    Assert.assertTrue(asm.launcherLaunchCalled);
+    Assert.assertTrue(asm.schedulerCleanupCalled);
+    Assert.assertTrue(asm.schedulerScheduleCalled);
     /* check for update of completed application */
     clusterTracker.updateListener(firstNode, containers);
     NodeResponse response = firstNode.statusUpdate(containers);
-    assertTrue(response.getFinishedApplications().contains(appID));
+    Assert.assertTrue(response.getFinishedApplications().contains(appID));
     LOG.info("The containers to clean up " + response.getContainersToCleanUp().size());
-    assertTrue(response.getContainersToCleanUp().size() == 2);
+    Assert.assertEquals(2, response.getContainersToCleanUp().size());
   }
 }

+ 20 - 15
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +48,7 @@ import org.junit.Test;
  * A test case that tests the expiry of the application master.
  * More tests can be added to this. 
  */
-public class TestApplicationMasterExpiry extends TestCase {
+public class TestApplicationMasterExpiry {
   private static final Log LOG = LogFactory.getLog(TestApplicationMasterExpiry.class);
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   AMTracker tracker;
@@ -88,16 +87,17 @@ public class TestApplicationMasterExpiry extends TestCase {
   private AtomicInteger expiry = new AtomicInteger();
   private boolean expired = false;
   
-  private class ApplicationEventTypeListener implements EventHandler<ASMEvent<ApplicationEventType>> {
+  private class ApplicationEventTypeListener implements
+      EventHandler<ApplicationMasterInfoEvent> {
     ApplicationEventTypeListener() {
       context.getDispatcher().register(ApplicationEventType.class, this);
     }
     @Override
-    public void handle(ASMEvent<ApplicationEventType> event) {
+    public void handle(ApplicationMasterInfoEvent event) {
       switch(event.getType()) {
       case EXPIRE:
         expired = true;
-        LOG.info("Received expiry from application " + event.getAppContext().getApplicationID());
+        LOG.info("Received expiry from application " + event.getApplicationId());
         synchronized(expiry) {
           expiry.addAndGet(1);
         }
@@ -130,26 +130,31 @@ public class TestApplicationMasterExpiry extends TestCase {
       Thread.sleep(500);
       count++;
     }
-    assertTrue(masterInfo.getState() == finalState);
+    Assert.assertEquals(finalState, masterInfo.getState());
   }
 
   @Test
   public void testAMExpiry() throws Exception {
-    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-    context.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
-    context.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
-    context.getApplicationId().setId(1);
+    ApplicationSubmissionContext submissionContext = recordFactory
+        .newRecordInstance(ApplicationSubmissionContext.class);
+    submissionContext.setApplicationId(recordFactory
+        .newRecordInstance(ApplicationId.class));
+    submissionContext.getApplicationId().setClusterTimestamp(
+        System.currentTimeMillis());
+    submissionContext.getApplicationId().setId(1);
     
     tracker.addMaster(
         "dummy", 
-        context, "dummytoken");
-    ApplicationMasterInfo masterInfo = tracker.get(context.getApplicationId());
-    tracker.runApplication(context.getApplicationId());
+        submissionContext, "dummytoken");
+    ApplicationMasterInfo masterInfo = tracker.get(submissionContext.getApplicationId());
+    tracker.runApplication(submissionContext.getApplicationId());
     this.context.getDispatcher().getEventHandler().handle(
-        new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATED, masterInfo));
+        new ApplicationMasterAllocatedEvent(masterInfo.getApplicationID(),
+            masterInfo.getMasterContainer()));
     waitForState(masterInfo, ApplicationState.LAUNCHING);
     this.context.getDispatcher().getEventHandler().handle(
-    new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED, masterInfo));
+        new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED,
+            masterInfo.getApplicationID()));
     synchronized(expiry) {
       while (expiry.get() == 0) {
         expiry.wait(1000);

+ 17 - 16
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,12 +31,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
@@ -49,7 +48,7 @@ import org.junit.Test;
  * Testing the applications manager launcher.
  *
  */
-public class TestApplicationMasterLauncher extends TestCase {
+public class TestApplicationMasterLauncher {
   private static final Log LOG = LogFactory.getLog(TestApplicationMasterLauncher.class);
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private ApplicationMasterLauncher amLauncher;
@@ -63,10 +62,12 @@ public class TestApplicationMasterLauncher extends TestCase {
   AtomicInteger launched = new AtomicInteger();
   AtomicInteger cleanedUp = new AtomicInteger();
   private RMContext context = new ResourceManager.RMContextImpl(new MemStore());
+
+  private Configuration conf = new Configuration();
   
-  private class DummyASM implements EventHandler<ASMEvent<ApplicationEventType>> {
+  private class DummyASM implements EventHandler<ApplicationMasterInfoEvent> {
     @Override
-    public void handle(ASMEvent<ApplicationEventType> appEvent) {
+    public void handle(ApplicationMasterInfoEvent appEvent) {
       ApplicationEventType event = appEvent.getType();
       switch (event) {
       case FINISH:
@@ -132,7 +133,6 @@ public class TestApplicationMasterLauncher extends TestCase {
     asmHandle = new DummyASM();
     amLauncher = new DummyApplicationMasterLauncher(applicationTokenSecretManager,
         clientToAMSecretManager, asmHandle);
-    Configuration conf = new Configuration();
     context.getDispatcher().init(conf);
     amLauncher.init(conf);
     context.getDispatcher().start();
@@ -147,14 +147,15 @@ public class TestApplicationMasterLauncher extends TestCase {
 
   @Test
   public void testAMLauncher() throws Exception {
-    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-    context.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
-    context.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
-    context.getApplicationId().setId(1);
-    context.setUser("dummyuser");
+    ApplicationSubmissionContext submissionContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    submissionContext.setApplicationId(recordFactory.newRecordInstance(ApplicationId.class));
+    submissionContext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
+    submissionContext.getApplicationId().setId(1);
+    submissionContext.setUser("dummyuser");
     ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.context,
-        "dummyuser", context,
-        "dummyclienttoken", StoreFactory.createVoidAppStore());
+        this.conf, "dummyuser", submissionContext, "dummyclienttoken",
+        StoreFactory.createVoidAppStore(), new AMLivelinessMonitor(context
+            .getDispatcher().getEventHandler()));
     amLauncher.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.LAUNCH, 
       masterInfo));
     amLauncher.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.CLEANUP,  
@@ -162,7 +163,7 @@ public class TestApplicationMasterLauncher extends TestCase {
     synchronized (doneLaunching) {
       doneLaunching.wait(10000);
     }
-    assertTrue(launched.get() == 1);
-    assertTrue(cleanedUp.get() == 1);
+    Assert.assertEquals(1, launched.get());
+    Assert.assertEquals(1, cleanedUp.get());
   }
 }

+ 17 - 18
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java

@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -44,26 +43,24 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestSchedulerNegotiator extends TestCase {
+public class TestSchedulerNegotiator {
   private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private SchedulerNegotiator schedulerNegotiator;
   private DummyScheduler scheduler;
@@ -72,7 +69,7 @@ public class TestSchedulerNegotiator extends TestCase {
   private final RMContext context = new ResourceManager.RMContextImpl(new MemStore());
   ApplicationMasterInfo masterInfo;
   private EventHandler handler;
-  
+  private Configuration conf = new Configuration();
   private class DummyScheduler implements ResourceScheduler {
     @Override
     public Allocation allocate(ApplicationId applicationId,
@@ -157,7 +154,6 @@ public class TestSchedulerNegotiator extends TestCase {
   public void setUp() {
     scheduler = new DummyScheduler();
     schedulerNegotiator = new SchedulerNegotiator(context, scheduler);
-    Configuration conf = new Configuration();
     schedulerNegotiator.init(conf);
     schedulerNegotiator.start();
     handler = context.getDispatcher().getEventHandler();
@@ -182,10 +178,10 @@ public class TestSchedulerNegotiator extends TestCase {
     }
     Assert.assertEquals(state, info.getState());
   }
-  
-  private class DummyEventHandler implements EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+
+  private class DummyEventHandler implements EventHandler<ASMEvent<AMLauncherEventType>> {
     @Override
-    public void handle(ASMEvent<ApplicationTrackerEventType> event) {
+    public void handle(ASMEvent<AMLauncherEventType> event) {
     }
   }
 
@@ -196,15 +192,18 @@ public class TestSchedulerNegotiator extends TestCase {
     submissionContext.getApplicationId().setClusterTimestamp(System.currentTimeMillis());
     submissionContext.getApplicationId().setId(1);
     
-    masterInfo =
-      new ApplicationMasterInfo(this.context,
-          "dummy", submissionContext, "dummyClientToken", StoreFactory.createVoidAppStore());
+    masterInfo = new ApplicationMasterInfo(this.context, this.conf, "dummy",
+        submissionContext, "dummyClientToken", StoreFactory
+            .createVoidAppStore(), new AMLivelinessMonitor(context
+            .getDispatcher().getEventHandler()));
     context.getDispatcher().register(ApplicationEventType.class, masterInfo);
-    context.getDispatcher().register(ApplicationTrackerEventType.class, masterInfo);
-    handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-    ALLOCATE, masterInfo));
-    waitForState(ApplicationState.ALLOCATED, masterInfo);
+    context.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+    context.getDispatcher().register(AMLauncherEventType.class,
+        new DummyEventHandler());
+    handler.handle(new ApplicationMasterInfoEvent(
+        ApplicationEventType.ALLOCATE, submissionContext.getApplicationId()));
+    waitForState(ApplicationState.LAUNCHING, masterInfo); // LAUNCHING because ALLOCATED automatically movesto LAUNCHING for now.
     Container container = masterInfo.getMasterContainer();
-    assertTrue(container.getId().getId() == testNum);
+    Assert.assertTrue(container.getId().getId() == testNum);
   }
 }