浏览代码

MAPREDUCE-2807. Fix AM restart and client redirection. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1161408 13f79535-47bb-0310-9956-ffa450edef68
Sharad Agarwal 13 年之前
父节点
当前提交
f2b91a8367
共有 23 个文件被更改,包括 651 次插入446 次删除
  1. 2 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 0 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
  3. 87 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
  4. 137 317
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
  5. 177 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
  6. 12 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  7. 53 17
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
  8. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationState.java
  9. 45 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
  10. 5 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
  11. 2 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  12. 2 18
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
  13. 4 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
  14. 6 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  15. 22 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  16. 10 4
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  17. 19 8
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
  18. 5 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
  19. 6 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
  20. 26 46
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  21. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
  22. 20 11
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
  23. 9 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1138,6 +1138,8 @@ Trunk (unreleased changes)
     MAPREDUCE-2846. Fix missing synchronization in the task log management.
     (omalley)
 
+    MAPREDUCE-2807. Fix AM restart and client redirection. (sharad)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java

@@ -369,7 +369,6 @@ public class TypeConverter {
     case SUBMITTED:
       return State.PREP;
     case RUNNING:
-    case RESTARTING:
       return State.RUNNING;
     case SUCCEEDED:
       return State.SUCCEEDED;

+ 87 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java

@@ -0,0 +1,87 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
+
+public class ClientCache {
+
+  private final Configuration conf;
+  private final ResourceMgrDelegate rm;
+  
+  private static final Log LOG = LogFactory.getLog(ClientCache.class);
+
+  private Map<JobID, ClientServiceDelegate> cache = 
+    new HashMap<JobID, ClientServiceDelegate>();
+  
+  private MRClientProtocol hsProxy;
+
+  ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+    this.conf = conf;
+    this.rm = rm;
+  }
+
+  //TODO: evict from the cache on some threshold
+  synchronized ClientServiceDelegate getClient(JobID jobId) {
+	if (hsProxy == null) {
+      try {
+		hsProxy = instantiateHistoryProxy();
+	  } catch (IOException e) {
+		LOG.warn("Could not connect to History server.", e);
+		throw new YarnException("Could not connect to History server.", e);
+	  }
+	}
+    ClientServiceDelegate client = cache.get(jobId);
+    if (client == null) {
+      client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+      cache.put(jobId, client);
+    }
+    return client;
+  }
+
+  private MRClientProtocol instantiateHistoryProxy()
+  throws IOException {
+	String serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
+	          JHConfig.DEFAULT_HS_BIND_ADDRESS);
+    LOG.info("Connecting to HistoryServer at: " + serviceAddr);
+    Configuration myConf = new Configuration(conf);
+    //TODO This should ideally be using it's own class (instead of ClientRMSecurityInfo)
+    myConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
+        ClientRMSecurityInfo.class, SecurityInfo.class);
+    YarnRPC rpc = YarnRPC.create(myConf);
+    LOG.info("Connected to HistoryServer at: " + serviceAddr);
+    return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+        NetUtils.createSocketAddr(serviceAddr), myConf);
+  }
+}

+ 137 - 317
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -19,14 +19,13 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.UnknownHostException;
+import java.lang.reflect.Method;
 import java.security.PrivilegedAction;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
@@ -37,16 +36,21 @@ import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -65,48 +69,47 @@ import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
 
-public class ClientServiceDelegate {
+class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+  private static final NotRunningJob NOTSTARTEDJOB = 
+	  new NotRunningJob(JobState.NEW);
+  
+  private static final NotRunningJob FAILEDJOB = 
+	  new NotRunningJob(JobState.FAILED);
+  
+  private static final NotRunningJob KILLEDJOB = 
+	  new NotRunningJob(JobState.KILLED);
 
-  private Configuration conf;
-  private ApplicationId currentAppId;
-  private ApplicationState currentAppState = ApplicationState.NEW;
+  private final Configuration conf;
+  private final JobID jobId;
+  private final ApplicationId appId;
   private final ResourceMgrDelegate rm;
+  private final MRClientProtocol historyServerProxy;
+  private boolean forceRefresh;
   private MRClientProtocol realProxy = null;
-  private String serviceAddr = "";
-  private String serviceHttpAddr = "";
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
-  ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm) {
+  ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, 
+      JobID jobId, MRClientProtocol historyServerProxy) {
     this.conf = new Configuration(conf); // Cloning for modifying.
     // For faster redirects from AM to HS.
     this.conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 3);
     this.rm = rm;
+    this.jobId = jobId;
+    this.historyServerProxy = historyServerProxy;
+    this.appId = TypeConverter.toYarn(jobId).getAppId();
   }
 
-  private MRClientProtocol getProxy(JobID jobId) throws YarnRemoteException {
-    return getProxy(TypeConverter.toYarn(jobId).getAppId(), false);
-  }
-
-  private MRClientProtocol getRefreshedProxy(JobID jobId) throws YarnRemoteException {
-    return getProxy(TypeConverter.toYarn(jobId).getAppId(), true);
-  }
-
-  private MRClientProtocol getProxy(ApplicationId appId, 
-      boolean forceRefresh) throws YarnRemoteException {
-    if (!appId.equals(currentAppId) || forceRefresh || realProxy == null) {
-      currentAppId = appId;
-      refreshProxy();
+  private MRClientProtocol getProxy() throws YarnRemoteException {
+    if (!forceRefresh && realProxy != null) {
+      return realProxy;
     }
-    return realProxy;
-  }
-
-  private void refreshProxy() throws YarnRemoteException {
-    //TODO RM NPEs for unknown jobs. History may still be aware.
+      //TODO RM NPEs for unknown jobs. History may still be aware.
     // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
     // and redirect to the history server.
-    ApplicationReport application = rm.getApplicationReport(currentAppId);
+    ApplicationReport application = rm.getApplicationReport(appId);
+    String serviceAddr = null;
     while (ApplicationState.RUNNING.equals(application.getState())) {
       try {
         if (application.getHost() == null || "".equals(application.getHost())) {
@@ -114,12 +117,10 @@ public class ClientServiceDelegate {
           Thread.sleep(2000);
    
           LOG.debug("Application state is " + application.getState());
-          application = rm.getApplicationReport(currentAppId);
+          application = rm.getApplicationReport(appId);
           continue;
         }
         serviceAddr = application.getHost() + ":" + application.getRpcPort();
-        serviceHttpAddr = application.getTrackingUrl();
-        currentAppState = application.getState();
         if (UserGroupInformation.isSecurityEnabled()) {
           String clientTokenEncoded = application.getClientToken();
           Token<ApplicationTokenIdentifier> clientToken =
@@ -129,9 +130,10 @@ public class ClientServiceDelegate {
               + application.getRpcPort()));
           UserGroupInformation.getCurrentUser().addToken(clientToken);
         }
+        LOG.info("Tracking Url of JOB is " + application.getTrackingUrl());
         LOG.info("Connecting to " + serviceAddr);
         instantiateAMProxy(serviceAddr);
-        return;
+        return realProxy;
       } catch (Exception e) {
         //possibly
         //possibly the AM has crashed
@@ -143,41 +145,39 @@ public class ClientServiceDelegate {
           Thread.sleep(2000);
         } catch (InterruptedException e1) {
         }
-        application = rm.getApplicationReport(currentAppId);
+        application = rm.getApplicationReport(appId);
       }
     }
 
-    currentAppState = application.getState();
     /** we just want to return if its allocating, so that we dont 
      * block on it. This is to be able to return job status 
      * on a allocating Application.
      */
     
-    if (currentAppState == ApplicationState.NEW) {
+    if (application.getState() == ApplicationState.NEW ||
+        application.getState() == ApplicationState.SUBMITTED) {
+      realProxy = null;
+      return NOTSTARTEDJOB;
+    }
+    
+    if (application.getState() == ApplicationState.FAILED) {
       realProxy = null;
-      return;
+      return FAILEDJOB;
     }
     
-    if (currentAppState == ApplicationState.SUCCEEDED
-        || currentAppState == ApplicationState.FAILED
-        || currentAppState == ApplicationState.KILLED) {
-      serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
-          JHConfig.DEFAULT_HS_BIND_ADDRESS);
+    if (application.getState() == ApplicationState.KILLED) {
+        realProxy = null;
+        return KILLEDJOB;
+      }
+    
+    //History server can serve a job only if application 
+    //succeeded.
+    if (application.getState() == ApplicationState.SUCCEEDED) {
       LOG.info("Application state is completed. " +
           "Redirecting to job history server " + serviceAddr);
-      try {
-        serviceHttpAddr = JobHistoryUtils.getHistoryUrl(conf, currentAppId);
-      } catch (UnknownHostException e) {
-        LOG.warn("Unable to get history url", e);
-        serviceHttpAddr = "UNKNOWN";
-      }
-      try {
-        instantiateHistoryProxy(serviceAddr);
-        return;
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
+      realProxy = historyServerProxy;
     }
+    return realProxy;
   }
 
   private void instantiateAMProxy(final String serviceAddr) throws IOException {
@@ -198,129 +198,70 @@ public class ClientServiceDelegate {
     LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
   }
 
-  private void instantiateHistoryProxy(final String serviceAddr)
-  throws IOException {
-    LOG.trace("Connecting to HistoryServer at: " + serviceAddr);
-    Configuration myConf = new Configuration(conf);
-    //TODO This should ideally be using it's own class (instead of ClientRMSecurityInfo)
-    myConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
-        ClientRMSecurityInfo.class, SecurityInfo.class);
-    YarnRPC rpc = YarnRPC.create(myConf);
-    realProxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
-        NetUtils.createSocketAddr(serviceAddr), myConf);
-    LOG.trace("Connected to HistoryServer at: " + serviceAddr);
-  }
-
-  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
-  InterruptedException {
-    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+  private synchronized Object invoke(String method, Class argClass, 
+      Object args) throws YarnRemoteException {
+    Method methodOb = null;
     try {
-      GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
-      request.setJobId(jobID);
-      MRClientProtocol protocol = getProxy(arg0);
-      if (protocol == null) {
-        /* no AM to connect to, fake counters */
-        return new org.apache.hadoop.mapreduce.Counters();
-      }
-      return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch(Exception e) {
-      LOG.debug("Failing to contact application master", e);
+      methodOb = MRClientProtocol.class.getMethod(method, argClass);
+    } catch (SecurityException e) {
+      throw new YarnException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnException("Method name mismatch", e);
+    }
+    while (true) {
       try {
-        GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
-        request.setJobId(jobID);
-        MRClientProtocol protocol = getRefreshedProxy(arg0);
-        if (protocol == null) {
-          /* no History to connect to, fake counters */
-          return new org.apache.hadoop.mapreduce.Counters();
-        }
-        return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
-      } catch(YarnRemoteException yre) {
+        return methodOb.invoke(getProxy(), args);
+      } catch (YarnRemoteException yre) {
+        LOG.warn("Exception thrown by remote end.");
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
+      } catch (Exception e) {
+        LOG.info("Failed to contact AM for job " + jobId + "  Will retry..");
+        LOG.debug("Failing to contact application master", e);
+        forceRefresh = true;
       }
     }
   }
 
-  public String getJobHistoryDir() throws IOException, InterruptedException {
-    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-  }
-
-  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
-      int arg2) throws IOException, InterruptedException {
+  org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+  InterruptedException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
-    List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = null;
-    GetTaskAttemptCompletionEventsRequest request = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
-    MRClientProtocol protocol;
-    try {
+      GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
       request.setJobId(jobID);
-      request.setFromEventId(arg1);
-      request.setMaxEvents(arg2);
-      protocol = getProxy(arg0);
-      /** This is hack to get around the issue of faking jobstatus while the AM
-       * is coming up.
-       */
-      if (protocol == null) {
-        return new TaskCompletionEvent[0];
-      }
-      list = getProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch(Exception e) {
-      LOG.debug("Failed to contact application master ", e);
-      try {
-        request.setJobId(jobID);
-        request.setFromEventId(arg1);
-        request.setMaxEvents(arg2);
-        protocol = getRefreshedProxy(arg0);
-        if (protocol == null) {
-          return new TaskCompletionEvent[0];
-        }
-        list = protocol.getTaskAttemptCompletionEvents(request).getCompletionEventList();
-      } catch(YarnRemoteException yre) {
-        LOG.warn(RPCUtil.toString(yre));
-        throw yre;
-      }
-    }
-    return TypeConverter.fromYarn(
-        list.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+      Counters cnt = ((GetCountersResponse) 
+          invoke("getCounters", GetCountersRequest.class, request)).getCounters();
+      return TypeConverter.fromYarn(cnt);
+      
   }
 
-  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
-      arg0)
-  throws IOException,
-  InterruptedException {
+  TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+      throws IOException, InterruptedException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
+        .toYarn(arg0);
+    GetTaskAttemptCompletionEventsRequest request = recordFactory
+        .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+    request.setJobId(jobID);
+    request.setFromEventId(arg1);
+    request.setMaxEvents(arg2);
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = 
+      ((GetTaskAttemptCompletionEventsResponse) invoke(
+        "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
+        getCompletionEventList();
+    return TypeConverter
+        .fromYarn(list
+            .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+  }
 
-    List<String> list = null;
-    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(arg0);
-    GetDiagnosticsRequest request = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
-    MRClientProtocol protocol;
-    try {
-      request.setTaskAttemptId(attemptID);
-      protocol = getProxy(arg0.getJobID());
-      if (protocol == null) {
-        return new String[0];
-      }
-      list = getProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch(Exception e) {
-      LOG.debug("Failed to contact application master ", e);
-      try {
-        protocol = getRefreshedProxy(arg0.getJobID());
-        if (protocol == null) {
-          return new String[0];
-        }
-        list = protocol.getDiagnostics(request).getDiagnosticsList();
-      } catch(YarnRemoteException yre) {
-        LOG.warn(RPCUtil.toString(yre));
-        throw yre;
-      }
-    }
+  String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+      throws IOException, InterruptedException {
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
+        .toYarn(arg0);
+    GetDiagnosticsRequest request = recordFactory
+        .newRecordInstance(GetDiagnosticsRequest.class);
+    request.setTaskAttemptId(attemptID);
+    List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
+        GetDiagnosticsRequest.class, request)).getDiagnosticsList();
     String[] result = new String[list.size()];
     int i = 0;
     for (String c : list) {
@@ -329,180 +270,59 @@ public class ClientServiceDelegate {
     return result;
   }
   
-  private JobStatus createFakeJobReport(ApplicationState state, 
-      org.apache.hadoop.mapreduce.v2.api.records.JobId jobId, String jobFile) {
-    JobReport jobreport = recordFactory.newRecordInstance(JobReport.class);
-    jobreport.setCleanupProgress(0);
-    jobreport.setFinishTime(0);
-    jobreport.setJobId(jobId);
-    jobreport.setMapProgress(0);
-    /** fix this, the start time should be fixed */
-    jobreport.setStartTime(0);
-    jobreport.setReduceProgress(0);
-    jobreport.setSetupProgress(0);
-
-    if (currentAppState == ApplicationState.NEW) {
-      /* the protocol wasnt instantiated because the applicaton wasnt launched
-       * return a fake report.
-       */
-      jobreport.setJobState(JobState.NEW);
-    } else if (currentAppState == ApplicationState.SUBMITTED) {
-      jobreport.setJobState(JobState.NEW);
-    } else if (currentAppState == ApplicationState.KILLED) {
-      jobreport.setJobState(JobState.KILLED);
-    } else if (currentAppState == ApplicationState.FAILED) {
-      jobreport.setJobState(JobState.FAILED);
-    }
-    return  TypeConverter.fromYarn(jobreport, jobFile, serviceHttpAddr);
-  }
-
-  public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
-  YarnRemoteException {
+  JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
+       YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = 
       TypeConverter.toYarn(oldJobID);
     String stagingDir = conf.get("yarn.apps.stagingDir");
     String jobFile = stagingDir + "/" + jobId.toString();
-    JobReport report = null;
     MRClientProtocol protocol;
     GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
-    try {
-      request.setJobId(jobId);
-      protocol = getProxy(oldJobID);
-      
-      if (protocol == null) {
-        return createFakeJobReport(currentAppState, jobId, jobFile);
-      }
-      report = getProxy(oldJobID).getJobReport(request).getJobReport();
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch (Exception e) {
-      try {
-        request.setJobId(jobId);
-        protocol = getRefreshedProxy(oldJobID);
-        /* this is possible if an application that was running is killed */
-        if (protocol == null)  {
-          return createFakeJobReport(currentAppState, jobId, jobFile);
-        }
-        report = protocol.getJobReport(request).getJobReport();
-      } catch(YarnRemoteException yre) {
-        LOG.warn(RPCUtil.toString(yre));
-        throw yre;
-      }
-    }
-    return TypeConverter.fromYarn(report, jobFile, serviceHttpAddr);
+    request.setJobId(jobId);
+    JobReport  report = ((GetJobReportResponse) invoke("getJobReport", 
+        GetJobReportRequest.class, request)).getJobReport();
+    //TODO: add tracking url in JobReport
+    return TypeConverter.fromYarn(report, jobFile, "");
   }
 
-  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
-  throws YarnRemoteException, YarnRemoteException {
-    List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = null;
+  org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+       throws YarnRemoteException, YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
     GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
-    MRClientProtocol protocol = null;
-    try {
-      request.setJobId(nJobID);
-      request.setTaskType(TypeConverter.toYarn(taskType));
-      protocol = getProxy(jobID);
-      if (protocol == null) {
-        return new org.apache.hadoop.mapreduce.TaskReport[0];
-      }
-      taskReports = getProxy(jobID).getTaskReports(request).getTaskReportList();
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch(Exception e) {
-      LOG.debug("Failed to contact application master ", e);
-      try {
-        request.setJobId(nJobID);
-        request.setTaskType(TypeConverter.toYarn(taskType));
-        protocol = getRefreshedProxy(jobID);
-        if (protocol == null) {
-          return new org.apache.hadoop.mapreduce.TaskReport[0];
-        }
-        taskReports = protocol.getTaskReports(request).getTaskReportList();
-      } catch(YarnRemoteException yre) {
-        LOG.warn(RPCUtil.toString(yre));
-        throw yre;
-      }
-    }
+    
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = 
+      ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class, 
+          request)).getTaskReportList();
+    
     return TypeConverter.fromYarn
     (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
   }
 
-  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
-  throws YarnRemoteException {
+  boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+       throws YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID 
-    = TypeConverter.toYarn(taskAttemptID);
-    KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
-    FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
-    MRClientProtocol protocol = getProxy(taskAttemptID.getJobID());
-    if (protocol == null) {
-      return false;
-    }
-    try {
-      if (fail) {
-        failRequest.setTaskAttemptId(attemptID);
-        getProxy(taskAttemptID.getJobID()).failTaskAttempt(failRequest);
-      } else {
-        killRequest.setTaskAttemptId(attemptID);
-        getProxy(taskAttemptID.getJobID()).killTaskAttempt(killRequest);
-      }
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch(Exception e) {
-      LOG.debug("Failed to contact application master ", e);
-      MRClientProtocol proxy = getRefreshedProxy(taskAttemptID.getJobID());
-      if (proxy == null) {
-        return false;
-      }
-      try {
-        if (fail) {
-          failRequest.setTaskAttemptId(attemptID);
-          proxy.failTaskAttempt(failRequest);
-        } else {
-          killRequest.setTaskAttemptId(attemptID);
-          proxy.killTaskAttempt(killRequest);
-        }
-      } catch(YarnRemoteException yre) {
-        LOG.warn(RPCUtil.toString(yre));
-        throw yre;
-      }
+      = TypeConverter.toYarn(taskAttemptID);
+    if (fail) {
+      FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+      failRequest.setTaskAttemptId(attemptID);
+      invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
+    } else {
+      KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+      killRequest.setTaskAttemptId(attemptID);
+      invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
     }
     return true;
   }
   
-  public boolean killJob(JobID oldJobID)
-  throws YarnRemoteException {
+  boolean killJob(JobID oldJobID)
+       throws YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId 
     = TypeConverter.toYarn(oldJobID);
     KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
-    MRClientProtocol protocol = getProxy(oldJobID);
-    if (protocol == null) {
-      return false;
-    }
-    try {
-      killRequest.setJobId(jobId);
-      protocol.killJob(killRequest);
-      return true;
-    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
-      LOG.warn(RPCUtil.toString(yre));
-      throw yre;
-    } catch(Exception e) {
-      // Not really requied - if this is always the history context.
-      LOG.debug("Failed to contact application master ", e);
-      MRClientProtocol proxy = getRefreshedProxy(oldJobID);
-      if (proxy == null) {
-        return false;
-      }
-      try {
-        killRequest.setJobId(jobId);
-        protocol.killJob(killRequest);
-        return true;
-      } catch(YarnRemoteException yre) {
-        LOG.warn(RPCUtil.toString(yre));
-        throw yre;
-      }
-    }
+    killRequest.setJobId(jobId);
+    invoke("killJob", KillJobRequest.class, killRequest);
+    return true;
   }
+
+    
 }

+ 177 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java

@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+public class NotRunningJob implements MRClientProtocol {
+
+  private RecordFactory recordFactory = 
+    RecordFactoryProvider.getRecordFactory(null);
+  
+  private final JobState jobState;
+
+  NotRunningJob(JobState jobState) {
+    this.jobState = jobState;
+  }
+
+  @Override
+  public FailTaskAttemptResponse failTaskAttempt(
+      FailTaskAttemptRequest request) throws YarnRemoteException {
+    FailTaskAttemptResponse resp = 
+      recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetCountersResponse getCounters(GetCountersRequest request)
+      throws YarnRemoteException {
+    GetCountersResponse resp = 
+      recordFactory.newRecordInstance(GetCountersResponse.class);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    resp.setCounters(counters);
+    return resp;
+  }
+
+  @Override
+  public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+      throws YarnRemoteException {
+    GetDiagnosticsResponse resp = 
+      recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+    resp.addDiagnostics("");
+    return resp;
+  }
+
+  @Override
+  public GetJobReportResponse getJobReport(GetJobReportRequest request)
+      throws YarnRemoteException {
+    GetJobReportResponse resp = 
+      recordFactory.newRecordInstance(GetJobReportResponse.class);
+    JobReport jobReport =
+      recordFactory.newRecordInstance(JobReport.class);
+    jobReport.setJobId(request.getJobId());
+    jobReport.setJobState(jobState);
+    resp.setJobReport(jobReport);
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+      GetTaskAttemptCompletionEventsRequest request)
+      throws YarnRemoteException {
+    GetTaskAttemptCompletionEventsResponse resp = 
+      recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+    resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptReportResponse getTaskAttemptReport(
+      GetTaskAttemptReportRequest request) throws YarnRemoteException {
+    //not invoked by anybody
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+      throws YarnRemoteException {
+    GetTaskReportResponse resp = 
+      recordFactory.newRecordInstance(GetTaskReportResponse.class);
+    TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+    report.setTaskId(request.getTaskId());
+    report.setTaskState(TaskState.NEW);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    report.setCounters(counters);
+    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+      throws YarnRemoteException {
+    GetTaskReportsResponse resp = 
+      recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+    resp.addAllTaskReports(new ArrayList<TaskReport>());
+    return resp;
+  }
+
+  @Override
+  public KillJobResponse killJob(KillJobRequest request)
+      throws YarnRemoteException {
+    KillJobResponse resp = 
+      recordFactory.newRecordInstance(KillJobResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskResponse killTask(KillTaskRequest request)
+      throws YarnRemoteException {
+    KillTaskResponse resp = 
+      recordFactory.newRecordInstance(KillTaskResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskAttemptResponse killTaskAttempt(
+      KillTaskAttemptRequest request) throws YarnRemoteException {
+    KillTaskAttemptResponse resp = 
+      recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+    return resp;
+  }
+  
+}

+ 12 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.ClientConstants;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -98,7 +99,7 @@ public class YARNRunner implements ClientProtocol {
   
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private ResourceMgrDelegate resMgrDelegate;
-  private ClientServiceDelegate clientServiceDelegate;
+  private ClientCache clientCache;
   private YarnConfiguration conf;
   private final FileContext defaultFileContext;
 
@@ -111,7 +112,7 @@ public class YARNRunner implements ClientProtocol {
     this.conf = new YarnConfiguration(conf);
     try {
       this.resMgrDelegate = new ResourceMgrDelegate(this.conf);
-      this.clientServiceDelegate = new ClientServiceDelegate(this.conf,
+      this.clientCache = new ClientCache(this.conf,
           resMgrDelegate);
       this.defaultFileContext = FileContext.getFileContext(this.conf);
     } catch (UnsupportedFileSystemException ufe) {
@@ -248,7 +249,7 @@ public class YARNRunner implements ClientProtocol {
         || appMaster.getState() == ApplicationState.KILLED) {
       throw RPCUtil.getRemoteException("failed to run job");
     }
-    return clientServiceDelegate.getJobStatus(jobId);
+    return clientCache.getClient(jobId).getJobStatus(jobId);
   }
 
   private LocalResource createApplicationResource(FileContext fs, Path p)
@@ -519,43 +520,43 @@ public class YARNRunner implements ClientProtocol {
   @Override
   public Counters getJobCounters(JobID arg0) throws IOException,
       InterruptedException {
-    return clientServiceDelegate.getJobCounters(arg0);
+    return clientCache.getClient(arg0).getJobCounters(arg0);
   }
 
   @Override
   public String getJobHistoryDir() throws IOException, InterruptedException {
-    return clientServiceDelegate.getJobHistoryDir();
+    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
   }
 
   @Override
   public JobStatus getJobStatus(JobID jobID) throws IOException,
       InterruptedException {
-    JobStatus status = clientServiceDelegate.getJobStatus(jobID);
+    JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
     return status;
   }
   
   @Override
   public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
       int arg2) throws IOException, InterruptedException {
-    return clientServiceDelegate.getTaskCompletionEvents(arg0, arg1, arg2);
+    return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
   }
 
   @Override
   public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
       InterruptedException {
-    return clientServiceDelegate.getTaskDiagnostics(arg0);
+    return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
   }
 
   @Override
   public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
   throws IOException, InterruptedException {
-    return clientServiceDelegate
+    return clientCache.getClient(jobID)
         .getTaskReports(jobID, taskType);
   }
 
   @Override
   public void killJob(JobID arg0) throws IOException, InterruptedException {
-    if (!clientServiceDelegate.killJob(arg0)) {
+    if (!clientCache.getClient(arg0).killJob(arg0)) {
     resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
   }
   }
@@ -563,7 +564,7 @@ public class YARNRunner implements ClientProtocol {
   @Override
   public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
       InterruptedException {
-    return clientServiceDelegate.killTask(arg0, arg1);
+    return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
   }
 
   @Override

+ 53 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -30,7 +31,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -60,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -86,7 +91,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -111,6 +115,7 @@ public class TestClientRedirect {
   private volatile boolean amContact = false; 
   private volatile boolean hsContact = false;
   private volatile boolean amRunning = false;
+  private volatile boolean amRestarting = false;
 
   @Test
   public void testRedirect() throws Exception {
@@ -138,17 +143,9 @@ public class TestClientRedirect {
       new org.apache.hadoop.mapred.JobID("201103121733", 1);
     org.apache.hadoop.mapreduce.Counters counters = cluster.getJob(jobID)
         .getCounters();
-    Iterator<org.apache.hadoop.mapreduce.CounterGroup> it = counters.iterator();
-    while (it.hasNext()) {
-      org.apache.hadoop.mapreduce.CounterGroup group = it.next();
-      LOG.info("Group " + group.getDisplayName());
-      Iterator<org.apache.hadoop.mapreduce.Counter> itc = group.iterator();
-      while (itc.hasNext()) {
-        LOG.info("Counter is " + itc.next().getDisplayName());
-      }
-    }
+    validateCounters(counters);
     Assert.assertTrue(amContact);
-
+   
     LOG.info("Sleeping for 5 seconds before stop for" +
     " the client socket to not get EOF immediately..");
     Thread.sleep(5000);
@@ -160,10 +157,51 @@ public class TestClientRedirect {
     LOG.info("Sleeping for 5 seconds after stop for" +
     		" the server to exit cleanly..");
     Thread.sleep(5000);
+    
+    amRestarting = true;
+    // Same client
+    //results are returned from fake (not started job)
+    counters = cluster.getJob(jobID).getCounters();
+    Assert.assertEquals(0, counters.countCounters());
+    Job job = cluster.getJob(jobID);
+    org.apache.hadoop.mapreduce.TaskID taskId = 
+      new org.apache.hadoop.mapreduce.TaskID(jobID, TaskType.MAP, 0);
+    TaskAttemptID tId = new TaskAttemptID(taskId, 0);
+    
+    //invoke all methods to check that no exception is thrown
+    job.killJob();
+    job.killTask(tId);
+    job.failTask(tId);
+    job.getTaskCompletionEvents(0, 100);
+    job.getStatus();
+    job.getTaskDiagnostics(tId);
+    job.getTaskReports(TaskType.MAP);
+    job.getTrackingURL();
+    
+    amRestarting = false;
+    amService = new AMService();
+    amService.init(conf);
+    amService.start(conf);
+    amRunning = true;
+    amContact = false; //reset
+    
+    counters = cluster.getJob(jobID).getCounters();
+    validateCounters(counters);
+    Assert.assertTrue(amContact);
+    
+    amRunning = false;
 
     // Same client
     counters = cluster.getJob(jobID).getCounters();
-    it = counters.iterator();
+    validateCounters(counters);
+    Assert.assertTrue(hsContact);
+    
+    rmService.stop();
+    historyService.stop();
+  }
+
+  private void validateCounters(org.apache.hadoop.mapreduce.Counters counters) {
+    Iterator<org.apache.hadoop.mapreduce.CounterGroup> it = counters.iterator();
     while (it.hasNext()) {
       org.apache.hadoop.mapreduce.CounterGroup group = it.next();
       LOG.info("Group " + group.getDisplayName());
@@ -172,11 +210,7 @@ public class TestClientRedirect {
         LOG.info("Counter is " + itc.next().getDisplayName());
       }
     }
-
-    Assert.assertTrue(hsContact);
-    
-    rmService.stop();
-    historyService.stop();
+    Assert.assertEquals(1, counters.countCounters());
   }
 
   class RMService extends AbstractService implements ClientRMProtocol {
@@ -226,6 +260,8 @@ public class TestClientRedirect {
       application.setApplicationId(applicationId);
       if (amRunning) {
         application.setState(ApplicationState.RUNNING);
+      } else if (amRestarting) {
+        application.setState(ApplicationState.SUBMITTED);
       } else {
         application.setState(ApplicationState.SUCCEEDED);
       }

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationState.java

@@ -19,5 +19,5 @@
 package org.apache.hadoop.yarn.api.records;
 
 public enum ApplicationState {
-  NEW, SUBMITTED, RUNNING, RESTARTING, SUCCEEDED, FAILED, KILLED
+  NEW, SUBMITTED, RUNNING, SUCCEEDED, FAILED, KILLED
 }

+ 45 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java

@@ -11,6 +11,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder;
+import org.mortbay.log.Log;
 
 
     
@@ -33,7 +34,40 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
     counterFormat.setMinimumIntegerDigits(6);
   }
   
-  
+  // TODO: Why thread local?
+  // ^ NumberFormat instances are not threadsafe
+  private static final ThreadLocal<NumberFormat> appIdFormat = new ThreadLocal<NumberFormat>() {
+    @Override
+    public NumberFormat initialValue() {
+      NumberFormat fmt = NumberFormat.getInstance();
+      fmt.setGroupingUsed(false);
+      fmt.setMinimumIntegerDigits(4);
+      return fmt;
+    }
+  };
+
+  // TODO: fail the app submission if attempts are more than 10 or something
+  private static final ThreadLocal<NumberFormat> appAttemptIdFormat = new ThreadLocal<NumberFormat>() {
+    @Override
+    public NumberFormat initialValue() {
+      NumberFormat fmt = NumberFormat.getInstance();
+      fmt.setGroupingUsed(false);
+      fmt.setMinimumIntegerDigits(2);
+      return fmt;
+    }
+  };
+  // TODO: Why thread local?
+  // ^ NumberFormat instances are not threadsafe
+  private static final ThreadLocal<NumberFormat> containerIdFormat = new ThreadLocal<NumberFormat>() {
+    @Override
+    public NumberFormat initialValue() {
+      NumberFormat fmt = NumberFormat.getInstance();
+      fmt.setGroupingUsed(false);
+      fmt.setMinimumIntegerDigits(6);
+      return fmt;
+    }
+  };
+    
   public ContainerIdPBImpl() {
     builder = ContainerIdProto.newBuilder();
   }
@@ -162,18 +196,23 @@ public class ContainerIdPBImpl extends ProtoBase<ContainerIdProto> implements Co
 
   @Override
   public int compareTo(ContainerId other) {
-    if (this.getAppId().compareTo(other.getAppId()) == 0) {
+    if (this.getAppAttemptId().compareTo(other.getAppAttemptId()) == 0) {
       return this.getId() - other.getId();
     } else {
-      return this.getAppId().compareTo(other.getAppId());
+      return this.getAppAttemptId().compareTo(other.getAppAttemptId());
     }
     
   }
   
   @Override
   public String toString() {
-    String id = (this.getAppId() != null) ? this.getAppId().getClusterTimestamp() + "_" +
-        idFormat.format(this.getAppId().getId()): "none";
-    return "containerid_" + id + "_" + counterFormat.format(getId());
+    StringBuilder sb = new StringBuilder();
+    ApplicationId appId = getAppId();
+    sb.append("container_").append(appId.getClusterTimestamp()).append("_");
+    sb.append(appIdFormat.get().format(appId.getId())).append("_");
+    sb.append(appAttemptIdFormat.get().format(getAppAttemptId().
+        getAttemptId())).append("_");
+    sb.append(containerIdFormat.get().format(getId()));
+    return sb.toString();
   }
 }  

+ 5 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java

@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -68,6 +69,7 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
   public void write(DataOutput out) throws IOException {
     LOG.debug("Writing ContainerTokenIdentifier to RPC layer");
     out.writeInt(this.containerId.getAppId().getId());
+    out.writeInt(this.containerId.getAppAttemptId().getAttemptId());
     out.writeInt(this.containerId.getId());
     // TODO: Cluster time-stamp?
     out.writeUTF(this.nmHostName);
@@ -78,7 +80,10 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
   public void readFields(DataInput in) throws IOException {
     this.containerId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
     this.containerId.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+    this.containerId.setAppAttemptId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationAttemptId.class));
     this.containerId.getAppId().setId(in.readInt());
+    this.containerId.getAppAttemptId().setApplicationId(this.containerId.getAppId());
+    this.containerId.getAppAttemptId().setAttemptId(in.readInt());
     this.containerId.setId(in.readInt());
     this.nmHostName = in.readUTF();
     this.resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);

+ 2 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -134,11 +134,12 @@ public class BuilderUtils {
   }
 
   public static ContainerId newContainerId(RecordFactory recordFactory,
-      ApplicationId appId,
+      ApplicationId appId, ApplicationAttemptId appAttemptId,
       int containerId) {
     ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
     id.setAppId(appId);
     id.setId(containerId);
+    id.setAppAttemptId(appAttemptId);
     return id;
   }
 

+ 2 - 18
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java

@@ -104,18 +104,7 @@ public class ConverterUtils {
       }
     };
 
-  // TODO: Why thread local?
-  // ^ NumberFormat instances are not threadsafe
-  private static final ThreadLocal<NumberFormat> containerIdFormat =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(6);
-          return fmt;
-        }
-      };
+  
 
   public static String toString(ApplicationId appId) {
     StringBuilder sb = new StringBuilder();
@@ -142,12 +131,7 @@ public class ConverterUtils {
   }
 
   public static String toString(ContainerId cId) {
-    StringBuilder sb = new StringBuilder();
-    ApplicationId appId = cId.getAppId();
-    sb.append("container_").append(appId.getClusterTimestamp()).append("_");
-    sb.append(appIdFormat.get().format(appId.getId())).append("_");
-    sb.append(containerIdFormat.get().format(cId.getId()));
-    return sb.toString();
+    return cId.toString();
   }
 
   public static ContainerId toContainerId(RecordFactory recordFactory,

+ 4 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -104,6 +105,9 @@ public class TestEventFlow {
     ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
     ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
     cID.setAppId(recordFactory.newRecordInstance(ApplicationId.class));
+    ApplicationAttemptId atId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    atId.setApplicationId(cID.getAppId());
+    cID.setAppAttemptId(atId);
     launchContext.setContainerId(cID);
     launchContext.setUser("testing");
     launchContext.setResource(recordFactory.newRecordInstance(Resource.class));

+ 6 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -93,6 +94,7 @@ public class TestNodeStatusUpdater {
     }
 
     ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId appAttemptID = recordFactory.newRecordInstance(ApplicationAttemptId.class);
     ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class);
     ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class);
 
@@ -106,7 +108,9 @@ public class TestNodeStatusUpdater {
 
         // Give a container to the NM.
         applicationID.setId(heartBeatID);
+        appAttemptID.setApplicationId(applicationID);
         firstContainerID.setAppId(applicationID);
+        firstContainerID.setAppAttemptId(appAttemptID);
         firstContainerID.setId(heartBeatID);
         ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
         launchContext.setContainerId(firstContainerID);
@@ -130,7 +134,9 @@ public class TestNodeStatusUpdater {
 
         // Give another container to the NM.
         applicationID.setId(heartBeatID);
+        appAttemptID.setApplicationId(applicationID);
         secondContainerID.setAppId(applicationID);
+        secondContainerID.setAppAttemptId(appAttemptID);
         secondContainerID.setId(heartBeatID);
         ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
         launchContext.setContainerId(secondContainerID);

+ 22 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -46,9 +47,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -74,7 +75,14 @@ public class TestContainerManager extends BaseContainerManagerTest {
     boolean throwsException = false;
     try {
       GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
-      request.setContainerId(recordFactory.newRecordInstance(ContainerId.class));
+      ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+      ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+      appAttemptId.setApplicationId(appId);
+      appAttemptId.setAttemptId(1);
+      ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
+      cId.setAppId(appId);
+      cId.setAppAttemptId(appAttemptId);
+      request.setContainerId(cId);
       containerManager.getContainerStatus(request);
     } catch (YarnRemoteException e) {
       throwsException = true;
@@ -99,8 +107,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     // ////// Construct the Container-id
     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
     ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
     cId.setAppId(appId);
+    cId.setAppAttemptId(appAttemptId);
     container.setContainerId(cId);
 
     container.setUser(user);
@@ -184,8 +196,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     // ////// Construct the Container-id
     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
     ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
     cId.setAppId(appId);
+    cId.setAppAttemptId(appAttemptId);
     containerLaunchContext.setContainerId(cId);
 
     containerLaunchContext.setUser(user);
@@ -282,8 +298,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     // ////// Construct the Container-id
     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
     ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
     cId.setAppId(appId);
+    cId.setAppAttemptId(appAttemptId);
     container.setContainerId(cId);
 
     container.setUser(user);

+ 10 - 4
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -195,7 +197,7 @@ public class TestResourceLocalizationService {
       Thread.sleep(500);
       dispatcher.await();
       String appStr = ConverterUtils.toString(appId);
-      String ctnrStr = ConverterUtils.toString(c.getContainerID());
+      String ctnrStr = c.getContainerID().toString();
       verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
             eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
 
@@ -270,9 +272,13 @@ public class TestResourceLocalizationService {
 
   static Container getMockContainer(ApplicationId appId, int id) {
     Container c = mock(Container.class);
-    ContainerId cId = mock(ContainerId.class);
-    when(cId.getAppId()).thenReturn(appId);
-    when(cId.getId()).thenReturn(id);
+    ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
+    ContainerId cId = Records.newRecord(ContainerId.class);
+    cId.setAppAttemptId(appAttemptId);
+    cId.setAppId(appId);
+    cId.setId(id);
     when(c.getUser()).thenReturn("user0");
     when(c.getContainerID()).thenReturn(cId);
     Credentials creds = new Credentials();

+ 19 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -110,8 +111,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
             application1, this.user, null,
             ContainerLogsRetentionPolicy.ALL_CONTAINERS));
 
+    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(application1);
+    appAttemptId.setAttemptId(1);
     ContainerId container11 =
-        BuilderUtils.newContainerId(recordFactory, application1, 1);
+        BuilderUtils.newContainerId(recordFactory, application1, appAttemptId, 1);
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
@@ -188,14 +192,18 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
             application1, this.user, null,
             ContainerLogsRetentionPolicy.ALL_CONTAINERS));
 
+    ApplicationAttemptId appAttemptId1 = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId1.setApplicationId(application1);
     ContainerId container11 =
-        BuilderUtils.newContainerId(recordFactory, application1, 1);
+        BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 1);
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container11, "0"));
 
     ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
+    ApplicationAttemptId appAttemptId2 = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId1.setApplicationId(application2);
 
     File app2LogDir =
       new File(localLogDir, ConverterUtils.toString(application2));
@@ -204,19 +212,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         application2, this.user, null,
         ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY));
 
+    
     ContainerId container21 =
-        BuilderUtils.newContainerId(recordFactory, application2, 1);
+        BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2, 1);
     writeContainerLogs(app2LogDir, container21);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container21, "0"));
 
     ContainerId container12 =
-        BuilderUtils.newContainerId(recordFactory, application1, 2);
+        BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 2);
     writeContainerLogs(app1LogDir, container12);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container12, "0"));
 
     ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
+    ApplicationAttemptId appAttemptId3 = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId1.setApplicationId(application3);
 
     File app3LogDir =
       new File(localLogDir, ConverterUtils.toString(application3));
@@ -226,25 +237,25 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
 
     ContainerId container31 =
-        BuilderUtils.newContainerId(recordFactory, application3, 1);
+        BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 1);
     writeContainerLogs(app3LogDir, container31);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container31, "0"));
 
     ContainerId container32 =
-        BuilderUtils.newContainerId(recordFactory, application3, 2);
+        BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 2);
     writeContainerLogs(app3LogDir, container32);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container32, "1")); // Failed container
 
     ContainerId container22 =
-        BuilderUtils.newContainerId(recordFactory, application2, 2);
+        BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2, 2);
     writeContainerLogs(app2LogDir, container22);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container22, "0"));
 
     ContainerId container33 =
-        BuilderUtils.newContainerId(recordFactory, application3, 3);
+        BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 3);
     writeContainerLogs(app3LogDir, container33);
     logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
         container33, "0"));

+ 5 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -190,9 +191,13 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
     // ////// Construct the Container-id
     ApplicationId appId =
         recordFactory.newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
     ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
     cId.setAppId(appId);
     cId.setId(0);
+    cId.setAppAttemptId(appAttemptId);
     containerLaunchContext.setContainerId(cId);
 
     containerLaunchContext.setUser(user);

+ 6 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.Writer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -87,10 +88,13 @@ public class TestNMWebServer {
     when(app.getUser()).thenReturn(user);
     when(app.getAppId()).thenReturn(appId);
     nmContext.getApplications().put(appId, app);
+    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
     ContainerId container1 =
-        BuilderUtils.newContainerId(recordFactory, appId, 0);
+        BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 0);
     ContainerId container2 =
-        BuilderUtils.newContainerId(recordFactory, appId, 1);
+        BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1);
     NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
     for (ContainerId containerId : new ContainerId[] { container1,
         container2}) {

+ 26 - 46
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -106,9 +106,9 @@ public class RMAppImpl implements RMApp {
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
-        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FAILED),
+        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
         RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.ACCEPTED))
+        new AttemptFailedTransition(RMAppState.SUBMITTED))
     .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
         RMAppEventType.KILL, new AppKilledTransition())
 
@@ -116,23 +116,12 @@ public class RMAppImpl implements RMApp {
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
     .addTransition(RMAppState.RUNNING,
-        EnumSet.of(RMAppState.RUNNING, RMAppState.FAILED),
+        EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
         RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.RUNNING))
+        new AttemptFailedTransition(RMAppState.SUBMITTED))
     .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
         RMAppEventType.KILL, new AppKilledTransition())
 
-     // Transitions from RESTARTING state
-     // TODO - no way to get to RESTARTING state right now
-    .addTransition(RMAppState.RESTARTING, RMAppState.RUNNING,
-        RMAppEventType.ATTEMPT_REGISTERED)
-    .addTransition(RMAppState.RESTARTING,
-        EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
-        RMAppEventType.ATTEMPT_FAILED,
-        new AttemptFailedTransition(RMAppState.RESTARTING))
-    .addTransition(RMAppState.RESTARTING, RMAppState.KILLED,
-        RMAppEventType.KILL, new AppKilledTransition())
-
      // Transitions from FINISHED state
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         RMAppEventType.KILL)
@@ -263,8 +252,6 @@ public class RMAppImpl implements RMApp {
     case SUBMITTED:
     case ACCEPTED:
       return ApplicationState.SUBMITTED;
-    case RESTARTING:
-      return ApplicationState.RESTARTING;
     case RUNNING:
       return ApplicationState.RUNNING;
     case FINISHED:
@@ -375,6 +362,21 @@ public class RMAppImpl implements RMApp {
     }
   }
 
+  private void createNewAttempt() {
+    ApplicationAttemptId appAttemptId = Records
+        .newRecord(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(applicationId);
+    appAttemptId.setAttemptId(attempts.size() + 1);
+
+    RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
+        clientTokenStr, rmContext, scheduler, masterService,
+        submissionContext);
+    attempts.put(appAttemptId, attempt);
+    currentAttempt = attempt;
+    dispatcher.getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+  }
+
   private static class RMAppTransition implements
       SingleArcTransition<RMAppImpl, RMAppEvent> {
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -384,19 +386,7 @@ public class RMAppImpl implements RMApp {
 
   private static final class StartAppAttemptTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
-
-      ApplicationAttemptId appAttemptId = Records
-          .newRecord(ApplicationAttemptId.class);
-      appAttemptId.setApplicationId(app.applicationId);
-      appAttemptId.setAttemptId(app.attempts.size() + 1);
-
-      RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
-          app.clientTokenStr, app.rmContext, app.scheduler,
-          app.masterService, app.submissionContext);
-      app.attempts.put(appAttemptId, attempt);
-      app.currentAttempt = attempt;
-      app.dispatcher.getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+      app.createNewAttempt();
     };
   }
 
@@ -452,27 +442,17 @@ public class RMAppImpl implements RMApp {
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
       if (app.attempts.size() == app.maxRetries) {
-        app.diagnostics.append("Application " + app.getApplicationId()
-            + " failed " + app.maxRetries
-            + " times. Failing the application.");
+        String msg = "Application " + app.getApplicationId()
+        + " failed " + app.maxRetries
+        + " times. Failing the application.";
+        LOG.info(msg);
+        app.diagnostics.append(msg);
         // Inform the node for app-finish
         FINAL_TRANSITION.transition(app, event);
         return RMAppState.FAILED;
       }
 
-      ApplicationAttemptId appAttemptId = Records
-          .newRecord(ApplicationAttemptId.class);
-      appAttemptId.setApplicationId(app.applicationId);
-      appAttemptId.setAttemptId(app.attempts.size() + 1);
-
-      // Create a new attempt.
-      RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
-          app.clientTokenStr, app.rmContext, app.scheduler,
-          app.masterService, app.submissionContext);
-      app.attempts.put(appAttemptId, attempt);
-      app.currentAttempt = attempt;
-      app.dispatcher.getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
+      app.createNewAttempt();     
       return initialState;
     }
 

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java

@@ -1,5 +1,5 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 public enum RMAppState {
-  NEW, SUBMITTED, ACCEPTED, RUNNING, RESTARTING, FINISHED, FAILED, KILLED
+  NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
 }

+ 20 - 11
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -18,40 +18,34 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -306,6 +300,9 @@ public class TestRMAppTransitions {
     for (int i=1; i<maxRetries; i++) {
       RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
       application.handle(event);
+      assertAppState(RMAppState.SUBMITTED, application);
+      event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED);
+      application.handle(event);
       assertAppState(RMAppState.ACCEPTED, application);
     }
 
@@ -342,10 +339,22 @@ public class TestRMAppTransitions {
     LOG.info("--- START: testAppRunningFailed ---");
 
     RMApp application = testCreateAppRunning();
+    RMAppAttempt appAttempt = application.getCurrentAppAttempt();
+    int expectedAttemptId = 1;
+    Assert.assertEquals(expectedAttemptId, appAttempt.getAppAttemptId().getAttemptId());
     // RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
     for (int i=1; i<maxRetries; i++) {
       RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED);
       application.handle(event);
+      assertAppState(RMAppState.SUBMITTED, application);
+      appAttempt = application.getCurrentAppAttempt();
+      Assert.assertEquals(++expectedAttemptId, 
+          appAttempt.getAppAttemptId().getAttemptId());
+      event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED);
+      application.handle(event);
+      assertAppState(RMAppState.ACCEPTED, application);
+      event = new RMAppEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_REGISTERED);
+      application.handle(event);
       assertAppState(RMAppState.RUNNING, application);
     }
 

+ 9 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -284,8 +285,12 @@ public class TestContainerTokenSecretManager {
                   .newRecordInstance(GetContainerStatusRequest.class);
           ContainerId containerID =
               recordFactory.newRecordInstance(ContainerId.class);
+          ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+          appAttemptId.setApplicationId(appID);
+          appAttemptId.setAttemptId(1);
           containerID.setAppId(appID);
           containerID.setId(1);
+          containerID.setAppAttemptId(appAttemptId);
           request.setContainerId(containerID);
           client.getContainerStatus(request);
         } catch (YarnRemoteException e) {
@@ -331,8 +336,12 @@ public class TestContainerTokenSecretManager {
                   .newRecordInstance(GetContainerStatusRequest.class);
         containerID =
               recordFactory.newRecordInstance(ContainerId.class);
+        ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+        appAttemptId.setApplicationId(appID);
+        appAttemptId.setAttemptId(1);
         containerID.setAppId(appID);
         containerID.setId(1);
+        containerID.setAppAttemptId(appAttemptId);
         request.setContainerId(containerID);
         try {
           client.getContainerStatus(request);