Explorar o código

YARN-3039. Implemented the app-level timeline aggregator discovery service. Contributed by Junping Du.

Zhijie Shen %!s(int64=10) %!d(string=hai) anos
pai
achega
c685d9b77a
Modificáronse 51 ficheiros con 1862 adicións e 49 borrados
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
  3. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  5. 75 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  6. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
  7. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
  8. 14 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
  9. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
  10. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
  11. 124 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
  13. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  14. 15 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
  15. 15 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java
  16. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
  17. 56 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
  18. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
  19. 94 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
  20. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
  21. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
  22. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  23. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
  24. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
  25. 59 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
  26. 47 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  27. 142 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
  28. 74 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
  29. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
  30. 151 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
  31. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
  32. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  33. 100 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  34. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
  35. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  36. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  37. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  38. 113 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
  39. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  40. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  41. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  42. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  43. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  44. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
  45. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
  46. 50 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  47. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  48. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  49. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
  50. 73 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
  51. 6 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -29,6 +29,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3264. Created backing storage write interface and a POC only FS based
     storage implementation. (Vrushali C via zjshen)
 
+    YARN-3039. Implemented the app-level timeline aggregator discovery service.
+    (Junping Du via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java

@@ -127,6 +127,25 @@ public abstract class AllocateResponse {
     response.setAMRMToken(amRMToken);
     return response;
   }
+  
+  @Public
+  @Unstable
+  public static AllocateResponse newInstance(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, AMCommand command, int numClusterNodes,
+      PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
+      List<Container> increasedContainers,
+      List<Container> decreasedContainers,
+      String aggregatorAddr) {
+    AllocateResponse response =
+        newInstance(responseId, completedContainers, allocatedContainers,
+          updatedNodes, availResources, command, numClusterNodes, preempt,
+          nmTokens, increasedContainers, decreasedContainers);
+    response.setAMRMToken(amRMToken);
+    response.setAggregatorAddr(aggregatorAddr);
+    return response;
+  }
 
   /**
    * If the <code>ResourceManager</code> needs the
@@ -328,4 +347,18 @@ public abstract class AllocateResponse {
   @Private
   @Unstable
   public abstract void setApplicationPriority(Priority priority);
+
+  /**
+   * The address of aggregator that belong to this app
+   *
+   * @return The address of aggregator that belong to this attempt
+   */
+  @Public
+  @Unstable
+  public abstract String getAggregatorAddr();
+  
+  @Private
+  @Unstable
+  public abstract void setAggregatorAddr(String aggregatorAddr);
+
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -698,6 +698,11 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "container-manager.thread-count";
   public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
   
+  /** Number of threads container manager uses.*/
+  public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT =
+    NM_PREFIX + "aggregator-service.thread-count";
+  public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5;
+  
   /** Number of threads used in cleanup.*/
   public static final String NM_DELETE_THREAD_COUNT = 
     NM_PREFIX +  "delete.thread-count";
@@ -725,6 +730,13 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
     DEFAULT_NM_LOCALIZER_PORT;
   
+  /** Address where the aggregator service IPC is.*/
+  public static final String NM_AGGREGATOR_SERVICE_ADDRESS =
+    NM_PREFIX + "aggregator-service.address";
+  public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048;
+  public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS = 
+      "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT;
+  
   /** Interval in between cache cleanups.*/
   public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
     NM_PREFIX + "localizer.cache.cleanup.interval-ms";

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -89,6 +89,7 @@ message AllocateResponseProto {
   repeated ContainerProto decreased_containers = 11;
   optional hadoop.common.TokenProto am_rm_token = 12;
   optional PriorityProto application_priority = 13;
+  optional string aggregator_addr = 14;
 }
 
 enum SchedulerResourceTypes {

+ 75 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -40,6 +40,9 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -102,6 +105,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * An ApplicationMaster for executing shell commands on a set of launched
@@ -214,6 +218,13 @@ public class ApplicationMaster {
   private String appMasterTrackingUrl = "";
   
   private boolean newTimelineService = false;
+  
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO replace with event loop in TimelineClient.
+  private static ExecutorService threadPool = 
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+          .build());
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -303,6 +314,19 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
+      
+      threadPool.shutdown();
+      
+      while (!threadPool.isTerminated()) { // wait for all posting thread to finish
+        try {
+          if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
+            threadPool.shutdownNow(); // send interrupt to hurry them along
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Timeline client service stop interrupted!");
+          break;
+        }
+      }
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       LogManager.shutdown();
@@ -587,13 +611,15 @@ public class ApplicationMaster {
     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
     amRMClient.init(conf);
     amRMClient.start();
-
+   
     containerListener = createNMCallbackHandler();
     nmClientAsync = new NMClientAsyncImpl(containerListener);
     nmClientAsync.init(conf);
     nmClientAsync.start();
 
     startTimelineClient(conf);
+    // need to bind timelineClient
+    amRMClient.registerTimelineClient(timelineClient);
     if(timelineClient != null) {
       if (newTimelineService) {
         publishApplicationAttemptEventOnNewTimelineService(timelineClient, 
@@ -675,7 +701,12 @@ public class ApplicationMaster {
           if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
               YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
             // Creating the Timeline Client
-            timelineClient = TimelineClient.createTimelineClient();
+            if (newTimelineService) {
+              timelineClient = TimelineClient.createTimelineClient(
+                  appAttemptID.getApplicationId());
+            } else {
+              timelineClient = TimelineClient.createTimelineClient();
+            }
             timelineClient.init(conf);
             timelineClient.start();
           } else {
@@ -765,7 +796,7 @@ public class ApplicationMaster {
     if(timelineClient != null) {
       timelineClient.stop();
     }
-
+    
     return success;
   }
 
@@ -1291,6 +1322,18 @@ public class ApplicationMaster {
   }
   
   private static void publishContainerStartEventOnNewTimelineService(
+      final TimelineClient timelineClient, final Container container, 
+      final String domainId, final UserGroupInformation ugi) {
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+        publishContainerStartEventOnNewTimelineServiceBase(timelineClient, 
+            container, domainId, ugi);
+      }
+    };
+    threadPool.execute(publishWrapper);
+  }
+
+  private static void publishContainerStartEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, Container container, String domainId,
       UserGroupInformation ugi) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
@@ -1322,10 +1365,22 @@ public class ApplicationMaster {
           e instanceof UndeclaredThrowableException ? e.getCause() : e);
     }
   }
-
+  
   private static void publishContainerEndEventOnNewTimelineService(
-      final TimelineClient timelineClient, ContainerStatus container,
-      String domainId, UserGroupInformation ugi) {
+      final TimelineClient timelineClient, final ContainerStatus container,
+      final String domainId, final UserGroupInformation ugi) {
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+          publishContainerEndEventOnNewTimelineServiceBase(timelineClient, 
+              container, domainId, ugi);
+      }
+    };
+    threadPool.execute(publishWrapper);
+  }
+  
+  private static void publishContainerEndEventOnNewTimelineServiceBase(
+      final TimelineClient timelineClient, final ContainerStatus container,
+      final String domainId, final UserGroupInformation ugi) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getContainerId().toString());
@@ -1356,6 +1411,20 @@ public class ApplicationMaster {
   }
 
   private static void publishApplicationAttemptEventOnNewTimelineService(
+      final TimelineClient timelineClient, final String appAttemptId,
+      final DSEvent appEvent, final String domainId, 
+      final UserGroupInformation ugi) {
+  
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+        publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, 
+            appAttemptId, appEvent, domainId, ugi);
+      }
+    };
+    threadPool.execute(publishWrapper);
+  }
+  
+  private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, String appAttemptId,
       DSEvent appEvent, String domainId, UserGroupInformation ugi) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -49,6 +49,8 @@ import com.google.common.collect.ImmutableList;
 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     AbstractService {
   private static final Log LOG = LogFactory.getLog(AMRMClient.class);
+  
+  private TimelineClient timelineClient;
 
   /**
    * Create a new instance of AMRMClient.
@@ -394,6 +396,22 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     return nmTokenCache;
   }
 
+  /**
+   * Register TimelineClient to AMRMClient.
+   * @param timelineClient
+   */
+  public void registerTimelineClient(TimelineClient timelineClient) {
+    this.timelineClient = timelineClient;
+  }
+  
+  /**
+   * Get registered timeline client.
+   * @return
+   */
+  public TimelineClient getRegisteredTimeineClient() {
+    return this.timelineClient;
+  }
+  
   /**
    * Wait for <code>check</code> to return true for each 1000 ms.
    * See also {@link #waitFor(com.google.common.base.Supplier, int)}

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -275,6 +276,22 @@ extends AbstractService {
    * @return Current number of nodes in the cluster
    */
   public abstract int getClusterNodeCount();
+  
+  /**
+   * Register TimelineClient to AMRMClient.
+   * @param timelineClient
+   */
+  public void registerTimelineClient(TimelineClient timelineClient) {
+    client.registerTimelineClient(timelineClient);
+  }
+  
+  /**
+   * Get registered timeline client.
+   * @return
+   */
+  public TimelineClient getRegisteredTimeineClient() {
+    return client.getRegisteredTimeineClient();
+  }
 
   /**
    * Update application's blacklist with addition or removal resources.

+ 14 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -66,6 +67,8 @@ extends AMRMClientAsync<T> {
   private volatile boolean keepRunning;
   private volatile float progress;
   
+  private volatile String aggregatorAddr;
+  
   private volatile Throwable savedException;
 
   /**
@@ -351,7 +354,17 @@ extends AMRMClientAsync<T> {
           if (!allocated.isEmpty()) {
             handler.onContainersAllocated(allocated);
           }
-
+          
+          String aggregatorAddress = response.getAggregatorAddr();
+          TimelineClient timelineClient = client.getRegisteredTimeineClient();
+          if (timelineClient != null && aggregatorAddress != null 
+              && !aggregatorAddress.isEmpty()) {
+            if (aggregatorAddr == null || 
+                !aggregatorAddr.equals(aggregatorAddress)) {
+              aggregatorAddr = aggregatorAddress;
+              timelineClient.setTimelineServiceAddress(aggregatorAddress);
+            }
+          }
           progress = handler.getProgress();
         } catch (Throwable ex) {
           handler.onError(ex);

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java

@@ -384,6 +384,23 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     }
     this.amrmToken = amRMToken;
   }
+  
+
+  @Override
+  public String getAggregatorAddr() {
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getAggregatorAddr();
+  }
+  
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    maybeInitBuilder();
+    if (aggregatorAddr == null) {
+      builder.clearAggregatorAddr();
+      return;
+    }
+    builder.setAggregatorAddr(aggregatorAddr);
+  }
 
   @Override
   public synchronized Priority getApplicationPriority() {

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java

@@ -56,7 +56,6 @@ public abstract class TimelineClient extends AbstractService {
    * @return a timeline client
    */
   protected ApplicationId contextAppId;
-  protected String timelineServiceAddress;
 
   @Public
   public static TimelineClient createTimelineClient() {
@@ -240,7 +239,6 @@ public abstract class TimelineClient extends AbstractService {
    * @param address
    *          the timeline service address
    */
-  public void setTimelineServiceAddress(String address) {
-    timelineServiceAddress = address;
-  }
+  public abstract void setTimelineServiceAddress(String address);
+  
 }

+ 124 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java

@@ -117,6 +117,15 @@ public class TimelineClientImpl extends TimelineClient {
   private float timelineServiceVersion;
   private TimelineWriter timelineWriter;
 
+  private volatile String timelineServiceAddress;
+  
+  // Retry parameters for identifying new timeline service
+  // TODO consider to merge with connection retry
+  private int maxServiceRetries;
+  private long serviceRetryInterval;
+  
+  private boolean timelineServiceV2 = false;
+
   @Private
   @VisibleForTesting
   TimelineClientConnectionRetry connectionRetry;
@@ -261,6 +270,7 @@ public class TimelineClientImpl extends TimelineClient {
 
   public TimelineClientImpl(ApplicationId applicationId) {
     super(TimelineClientImpl.class.getName(), applicationId);
+    this.timelineServiceV2 = true;
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -289,22 +299,35 @@ public class TimelineClientImpl extends TimelineClient {
     client = new Client(new URLConnectionClientHandler(
         new TimelineURLConnectionFactory()), cc);
     TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
-    client.addFilter(retryFilter);
+    // TODO need to cleanup filter retry later.
+    if (!timelineServiceV2) {
+      client.addFilter(retryFilter);
+    }
 
-    if (YarnConfiguration.useHttps(conf)) {
-      timelineServiceAddress = conf.get(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
+    // old version timeline service need to get address from configuration
+    // while new version need to auto discovery (with retry).
+    if (timelineServiceV2) {
+      maxServiceRetries = conf.getInt(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+      serviceRetryInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
     } else {
-      timelineServiceAddress = conf.get(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
-    }
-    LOG.info("Timeline service address: " + resURI);
-    timelineServiceVersion =
-        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
-    LOG.info("Timeline service address: " + timelineServiceAddress);
+      if (YarnConfiguration.useHttps(conf)) {
+        setTimelineServiceAddress(conf.get(
+            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
+      } else {
+        setTimelineServiceAddress(conf.get(
+            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
+      }
+      timelineServiceVersion =
+          conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+      LOG.info("Timeline service address: " + getTimelineServiceAddress());
+    } 
     super.serviceInit(conf);
   }
 
@@ -369,8 +392,7 @@ public class TimelineClientImpl extends TimelineClient {
     if (async) {
       params.add("async", Boolean.TRUE.toString());
     }
-    putObjects(constructResURI(getConfig(), timelineServiceAddress, true),
-        "entities", params, entitiesContainer);
+    putObjects("entities", params, entitiesContainer);
   }
 
   @Override
@@ -378,6 +400,60 @@ public class TimelineClientImpl extends TimelineClient {
       YarnException {
     timelineWriter.putDomain(domain);
   }
+  
+  // Used for new timeline service only
+  @Private
+  public void putObjects(String path, MultivaluedMap<String, String> params, 
+      Object obj) throws IOException, YarnException {
+    
+    // timelineServiceAddress could haven't be initialized yet 
+    // or stale (only for new timeline service)
+    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+    
+    // timelineServiceAddress could be stale, add retry logic here.
+    boolean needRetry = true;
+    while (needRetry) {
+      try {
+        URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
+        putObjects(uri, path, params, obj);
+        needRetry = false;
+      }
+      catch (Exception e) {
+        // TODO only handle exception for timelineServiceAddress being updated.
+        // skip retry for other exceptions.
+        checkRetryWithSleep(retries, e);
+        retries--;
+      }
+    }
+  }
+  
+  /**
+   * Check if reaching to maximum of retries.
+   * @param retries
+   * @param e
+   */
+  private void checkRetryWithSleep(int retries, Exception e) throws 
+      YarnException, IOException {
+    if (retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+    } else {
+      LOG.error(
+        "TimelineClient has reached to max retry times :" + 
+        this.maxServiceRetries + " for service address: " + 
+        timelineServiceAddress);
+      if (e instanceof YarnException) {
+        throw (YarnException)e;
+      } else if (e instanceof IOException) {
+        throw (IOException)e;
+      } else {
+        throw new YarnException(e);
+      }
+    }
+  }
 
   private void putObjects(
       URI base, String path, MultivaluedMap<String, String> params, Object obj)
@@ -409,11 +485,21 @@ public class TimelineClientImpl extends TimelineClient {
     }
   }
 
+  @Override
+  public void setTimelineServiceAddress(String address) {
+    this.timelineServiceAddress = address;
+  }
+  
+  private String getTimelineServiceAddress() {
+    return this.timelineServiceAddress;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
       final String renewer) throws IOException, YarnException {
-    PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction =
+    PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
+        getDTAction =
         new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
 
           @Override
@@ -422,8 +508,10 @@ public class TimelineClientImpl extends TimelineClient {
             DelegationTokenAuthenticatedURL authUrl =
                 new DelegationTokenAuthenticatedURL(authenticator,
                     connConfigurator);
+            // TODO we should add retry logic here if timelineServiceAddress is
+            // not available immediately.
             return (Token) authUrl.getDelegationToken(
-                constructResURI(getConfig(), timelineServiceAddress, false).toURL(),
+                constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(),
                 token, renewer, doAsUser);
           }
         };
@@ -533,6 +621,24 @@ public class TimelineClientImpl extends TimelineClient {
     return connectionRetry.retryOn(tokenRetryOp);
   }
 
+  /**
+   * Poll TimelineServiceAddress for maximum of retries times if it is null
+   * @param retries
+   * @return the left retry times
+   */
+  private int pollTimelineServiceAddress(int retries) {
+    while (timelineServiceAddress == null && retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      timelineServiceAddress = getTimelineServiceAddress();
+      retries--;
+    }
+    return retries;
+  }
+
   private class TimelineURLConnectionFactory
       implements HttpURLConnectionFactory {
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java

@@ -201,7 +201,7 @@ public class WebAppUtils {
     return getResolvedAddress(address);
   }
 
-  private static String getResolvedAddress(InetSocketAddress address) {
+  public static String getResolvedAddress(InetSocketAddress address) {
     address = NetUtils.getConnectAddress(address);
     StringBuilder sb = new StringBuilder();
     InetAddress resolved = address.getAddress();

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -951,6 +951,12 @@
     <name>yarn.nodemanager.container-manager.thread-count</name>
     <value>20</value>
   </property>
+  
+    <property>
+    <description>Number of threads aggregator service uses.</description>
+    <name>yarn.nodemanager.aggregator-service.thread-count</name>
+    <value>5</value>
+  </property>
 
   <property>
     <description>Number of threads used in cleanup.</description>
@@ -1019,6 +1025,13 @@
     <name>yarn.nodemanager.localizer.address</name>
     <value>${yarn.nodemanager.hostname}:8040</value>
   </property>
+  
+  
+  <property>
+    <description>Address where the aggregator service IPC is.</description>
+    <name>yarn.nodemanager.aggregator-service.address</name>
+    <value>${yarn.nodemanager.hostname}:8048</value>
+  </property>
 
   <property>
     <description>Interval in between cache cleanups.</description>

+ 15 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java

@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
@@ -107,7 +108,7 @@ public class TestContainerLaunchRPC {
             resource, System.currentTimeMillis() + 10000, 42, 42,
             Priority.newInstance(0), 0);
       Token containerToken =
-          TestRPC.newContainerToken(nodeId, "password".getBytes(),
+          newContainerToken(nodeId, "password".getBytes(),
             containerTokenIdentifier);
 
       StartContainerRequest scRequest =
@@ -132,6 +133,19 @@ public class TestContainerLaunchRPC {
 
     Assert.fail("timeout exception should have occurred!");
   }
+  
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
 
   public class DummyContainerManager implements ContainerManagementProtocol {
 

+ 15 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerResourceIncreaseRPC.java

@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -96,7 +97,7 @@ public class TestContainerResourceIncreaseRPC {
               resource, System.currentTimeMillis() + 10000, 42, 42,
                   Priority.newInstance(0), 0);
       Token containerToken =
-          TestRPC.newContainerToken(nodeId, "password".getBytes(),
+          newContainerToken(nodeId, "password".getBytes(),
               containerTokenIdentifier);
       // Construct container resource increase request,
       List<Token> increaseTokens = new ArrayList<>();
@@ -119,6 +120,19 @@ public class TestContainerResourceIncreaseRPC {
     Assert.fail("timeout exception should have occurred!");
   }
 
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
   public class DummyContainerManager implements ContainerManagementProtocol {
 
     @Override

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml

@@ -147,6 +147,7 @@
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>ResourceTracker.proto</include>
                   <include>SCMUploader.proto</include>
+                  <include>aggregatornodemanager_protocol.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+
+/**
+ * <p>The protocol between an <code>TimelineAggregatorsCollection</code> and a 
+ * <code>NodeManager</code> to report a new application aggregator get launched.
+ * </p>
+ * 
+ */
+@Private
+public interface AggregatorNodemanagerProtocol {
+
+  /**
+   * 
+   * <p>
+   * The <code>TimelineAggregatorsCollection</code> provides a list of mapping
+   * between application and aggregator's address in 
+   * {@link ReportNewAggregatorsInfoRequest} to a <code>NodeManager</code> to
+   * <em>register</em> aggregator's info, include: applicationId and REST URI to 
+   * access aggregator. NodeManager will add them into registered aggregators 
+   * and register them into <code>ResourceManager</code> afterwards.
+   * </p>
+   * 
+   * @param request the request of registering a new aggregator or a list of aggregators
+   * @return 
+   * @throws YarnException
+   * @throws IOException
+   */
+  ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+      ReportNewAggregatorsInfoRequest request)
+      throws YarnException, IOException;
+  
+}

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService;
+
+@Private
+@Unstable
+@ProtocolInfo(
+    protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB",
+    protocolVersion = 1)
+public interface AggregatorNodemanagerProtocolPB extends 
+    AggregatorNodemanagerProtocolService.BlockingInterface {
+
+}

+ 94 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class AggregatorNodemanagerProtocolPBClientImpl implements
+    AggregatorNodemanagerProtocol, Closeable {
+
+  // Not a documented config. Only used for tests internally
+  static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+      + "rpc.nm-command-timeout";
+
+  /**
+   * Maximum of 1 minute timeout for a Node to react to the command
+   */
+  static final int DEFAULT_COMMAND_TIMEOUT = 60000;
+  
+  private AggregatorNodemanagerProtocolPB proxy;
+  
+  @Private
+  public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class,
+      ProtobufRpcEngine.class);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
+    proxy =
+        (AggregatorNodemanagerProtocolPB) RPC.getProxy(
+            AggregatorNodemanagerProtocolPB.class,
+            clientVersion, addr, ugi, conf,
+            NetUtils.getDefaultSocketFactory(conf), expireIntvl);
+  }
+  
+  @Override
+  public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+      ReportNewAggregatorsInfoRequest request) throws YarnException, IOException {
+  
+    ReportNewAggregatorsInfoRequestProto requestProto =
+        ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto();
+    try {
+      return new ReportNewAggregatorsInfoResponsePBImpl(
+          proxy.reportNewAggregatorInfo(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+  
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
+}

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class AggregatorNodemanagerProtocolPBServiceImpl implements
+    AggregatorNodemanagerProtocolPB {
+
+  private AggregatorNodemanagerProtocol real;
+  
+  public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo(
+      RpcController arg0, ReportNewAggregatorsInfoRequestProto proto) 
+      throws ServiceException {
+    ReportNewAggregatorsInfoRequestPBImpl request = 
+        new ReportNewAggregatorsInfoRequestPBImpl(proto);
+    try {
+      ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request);
+      return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+}

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java

@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -41,6 +43,22 @@ public abstract class NodeHeartbeatRequest {
     nodeHeartbeatRequest.setNodeLabels(nodeLabels);
     return nodeHeartbeatRequest;
   }
+  
+  public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+      MasterKey lastKnownContainerTokenMasterKey,
+      MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
+      Map<ApplicationId, String> registeredAggregators) {
+    NodeHeartbeatRequest nodeHeartbeatRequest =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+    nodeHeartbeatRequest
+        .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+    nodeHeartbeatRequest
+        .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+    nodeHeartbeatRequest.setNodeLabels(nodeLabels);
+    nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators);
+    return nodeHeartbeatRequest;
+  }
 
   public abstract NodeStatus getNodeStatus();
   public abstract void setNodeStatus(NodeStatus status);
@@ -59,4 +77,8 @@ public abstract class NodeHeartbeatRequest {
 
   public abstract void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationReportsForApps);
+
+  // This tells RM registered aggregators' address info on this node
+  public abstract Map<ApplicationId, String> getRegisteredAggregators();
+  public abstract void setRegisteredAggregators(Map<ApplicationId, String> appAggregatorsMap);
 }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -38,6 +38,10 @@ public interface NodeHeartbeatResponse {
   List<ContainerId> getContainersToBeRemovedFromNM();
 
   List<ApplicationId> getApplicationsToCleanup();
+  
+  // This tells NM the aggregators' address info of related Apps
+  Map<ApplicationId, String> getAppAggregatorsMap();
+  void setAppAggregatorsMap(Map<ApplicationId, String> appAggregatorsMap);
 
   void setResponseId(int responseId);
   void setNodeAction(NodeAction action);

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+public abstract class ReportNewAggregatorsInfoRequest {
+  
+  public static ReportNewAggregatorsInfoRequest newInstance(
+      List<AppAggregatorsMap> appAggregatorsList) {
+    ReportNewAggregatorsInfoRequest request =
+        Records.newRecord(ReportNewAggregatorsInfoRequest.class);
+    request.setAppAggregatorsList(appAggregatorsList);
+    return request;
+  }
+  
+  public static ReportNewAggregatorsInfoRequest newInstance(
+      ApplicationId id, String aggregatorAddr) {
+    ReportNewAggregatorsInfoRequest request =
+        Records.newRecord(ReportNewAggregatorsInfoRequest.class);
+    request.setAppAggregatorsList(
+        Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr)));
+    return request;
+  }
+  
+  public abstract List<AppAggregatorsMap> getAppAggregatorsList();
+  
+  public abstract void setAppAggregatorsList(
+      List<AppAggregatorsMap> appAggregatorsList);
+  
+}

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class ReportNewAggregatorsInfoResponse {
+
+  @Private
+  public static ReportNewAggregatorsInfoResponse newInstance() {
+    ReportNewAggregatorsInfoResponse response =
+        Records.newRecord(ReportNewAggregatorsInfoResponse.class);
+    return response;
+  }
+
+}

+ 59 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java

@@ -19,16 +19,22 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
@@ -52,6 +58,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private Set<NodeLabel> labels = null;
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
+  Map<ApplicationId, String> registeredAggregators = null;
+  
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
   }
@@ -106,6 +114,9 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
+    if (this.registeredAggregators != null) {
+      addRegisteredAggregatorsToProto();
+    }
   }
 
   private void addLogAggregationStatusForAppsToProto() {
@@ -146,6 +157,16 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       LogAggregationReport value) {
     return ((LogAggregationReportPBImpl) value).getProto();
   }
+  
+  private void addRegisteredAggregatorsToProto() {
+    maybeInitBuilder();
+    builder.clearRegisteredAggregators();
+    for (Map.Entry<ApplicationId, String> entry : registeredAggregators.entrySet()) {
+      builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder()
+        .setAppId(convertToProtoFormat(entry.getKey()))
+        .setAppAggregatorAddr(entry.getValue()));
+    }
+  }
 
   private void mergeLocalToProto() {
     if (viaProto) 
@@ -227,6 +248,36 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       builder.clearLastKnownNmTokenMasterKey();
     this.lastKnownNMTokenMasterKey = masterKey;
   }
+  
+  @Override
+  public Map<ApplicationId, String> getRegisteredAggregators() {
+    if (this.registeredAggregators != null) {
+      return this.registeredAggregators;
+    }
+    initRegisteredAggregators();
+    return registeredAggregators;
+  }
+  
+  private void initRegisteredAggregators() {
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppAggregatorsMapProto> list = p.getRegisteredAggregatorsList();
+    this.registeredAggregators = new HashMap<ApplicationId, String> ();
+    for (AppAggregatorsMapProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      this.registeredAggregators.put(appId, c.getAppAggregatorAddr());
+    }
+  }
+  
+  @Override
+  public void setRegisteredAggregators(
+      Map<ApplicationId, String> registeredAggregators) {
+    if (registeredAggregators == null || registeredAggregators.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.registeredAggregators = new HashMap<ApplicationId, String>();
+    this.registeredAggregators.putAll(registeredAggregators);
+  }
 
   private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
     return new NodeStatusPBImpl(p);
@@ -235,6 +286,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private NodeStatusProto convertToProtoFormat(NodeStatus t) {
     return ((NodeStatusPBImpl)t).getProto();
   }
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+  
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
 
   private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
     return new MasterKeyPBImpl(p);

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@@ -62,6 +63,8 @@ public class NodeHeartbeatResponsePBImpl extends
   private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
   private Map<ApplicationId, ByteBuffer> systemCredentials = null;
+  
+  Map<ApplicationId, String> appAggregatorsMap = null;
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -111,6 +114,10 @@ public class NodeHeartbeatResponsePBImpl extends
     if (this.containersToSignal != null) {
       addContainersToSignalToProto();
     }
+
+    if (this.appAggregatorsMap != null) {
+      addAppAggregatorsMapToProto();
+    }
   }
 
   private void addSystemCredentialsToProto() {
@@ -123,6 +130,16 @@ public class NodeHeartbeatResponsePBImpl extends
             entry.getValue().duplicate())));
     }
   }
+  
+  private void addAppAggregatorsMapToProto() {
+    maybeInitBuilder();
+    builder.clearAppAggregatorsMap();
+    for (Map.Entry<ApplicationId, String> entry : appAggregatorsMap.entrySet()) {
+      builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder()
+        .setAppId(convertToProtoFormat(entry.getKey()))
+        .setAppAggregatorAddr(entry.getValue()));
+    }
+  }
 
   private void mergeLocalToProto() {
     if (viaProto) 
@@ -490,6 +507,15 @@ public class NodeHeartbeatResponsePBImpl extends
     initSystemCredentials();
     return systemCredentials;
   }
+  
+  @Override
+  public Map<ApplicationId, String> getAppAggregatorsMap() {
+    if (this.appAggregatorsMap != null) {
+      return this.appAggregatorsMap;
+    }
+    initAppAggregatorsMap();
+    return appAggregatorsMap;
+  }
 
   private void initSystemCredentials() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@@ -501,6 +527,16 @@ public class NodeHeartbeatResponsePBImpl extends
       this.systemCredentials.put(appId, byteBuffer);
     }
   }
+  
+  private void initAppAggregatorsMap() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppAggregatorsMapProto> list = p.getAppAggregatorsMapList();
+    this.appAggregatorsMap = new HashMap<ApplicationId, String> ();
+    for (AppAggregatorsMapProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr());
+    }
+  }
 
   @Override
   public void setSystemCredentialsForApps(
@@ -512,6 +548,17 @@ public class NodeHeartbeatResponsePBImpl extends
     this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
     this.systemCredentials.putAll(systemCredentials);
   }
+  
+  @Override
+  public void setAppAggregatorsMap(
+      Map<ApplicationId, String> appAggregatorsMap) {
+    if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.appAggregatorsMap = new HashMap<ApplicationId, String>();
+    this.appAggregatorsMap.putAll(appAggregatorsMap);
+  }
 
   @Override
   public long getNextHeartBeatInterval() {

+ 142 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl;
+
+public class ReportNewAggregatorsInfoRequestPBImpl extends
+    ReportNewAggregatorsInfoRequest {
+
+  ReportNewAggregatorsInfoRequestProto proto = 
+      ReportNewAggregatorsInfoRequestProto.getDefaultInstance();
+  
+  ReportNewAggregatorsInfoRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private List<AppAggregatorsMap> aggregatorsList = null;
+
+  public ReportNewAggregatorsInfoRequestPBImpl() {
+    builder = ReportNewAggregatorsInfoRequestProto.newBuilder();
+  }
+
+  public ReportNewAggregatorsInfoRequestPBImpl(
+      ReportNewAggregatorsInfoRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public ReportNewAggregatorsInfoRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (aggregatorsList != null) {
+      addLocalAggregatorsToProto();
+    }
+  }
+  
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void addLocalAggregatorsToProto() {
+    maybeInitBuilder();
+    builder.clearAppAggregators();
+    List<AppAggregatorsMapProto> protoList =
+        new ArrayList<AppAggregatorsMapProto>();
+    for (AppAggregatorsMap m : this.aggregatorsList) {
+      protoList.add(convertToProtoFormat(m));
+    }
+    builder.addAllAppAggregators(protoList);
+  }
+
+  private void initLocalAggregatorsList() {
+    ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppAggregatorsMapProto> aggregatorsList =
+        p.getAppAggregatorsList();
+    this.aggregatorsList = new ArrayList<AppAggregatorsMap>();
+    for (AppAggregatorsMapProto m : aggregatorsList) {
+      this.aggregatorsList.add(convertFromProtoFormat(m));
+    }
+  }
+
+  @Override
+  public List<AppAggregatorsMap> getAppAggregatorsList() {  
+    if (this.aggregatorsList == null) {
+      initLocalAggregatorsList();
+    }
+    return this.aggregatorsList;
+  }
+
+  @Override
+  public void setAppAggregatorsList(List<AppAggregatorsMap> appAggregatorsList) {
+    maybeInitBuilder();
+    if (appAggregatorsList == null) {
+      builder.clearAppAggregators();
+    }
+    this.aggregatorsList = appAggregatorsList;
+  }
+  
+  private AppAggregatorsMapPBImpl convertFromProtoFormat(
+      AppAggregatorsMapProto p) {
+    return new AppAggregatorsMapPBImpl(p);
+  }
+
+  private AppAggregatorsMapProto convertToProtoFormat(
+      AppAggregatorsMap m) {
+    return ((AppAggregatorsMapPBImpl) m).getProto();
+  }
+
+}

+ 74 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ReportNewAggregatorsInfoResponsePBImpl extends
+    ReportNewAggregatorsInfoResponse {
+
+  ReportNewAggregatorsInfoResponseProto proto = 
+      ReportNewAggregatorsInfoResponseProto.getDefaultInstance();
+  
+  ReportNewAggregatorsInfoResponseProto.Builder builder = null;
+  
+  boolean viaProto = false;
+  
+  public ReportNewAggregatorsInfoResponsePBImpl() {
+    builder = ReportNewAggregatorsInfoResponseProto.newBuilder();
+  }
+
+  public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public ReportNewAggregatorsInfoResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+}

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java

@@ -0,0 +1,33 @@
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+public abstract class AppAggregatorsMap {
+
+  public static AppAggregatorsMap newInstance(
+      ApplicationId id, String aggregatorAddr) {
+    AppAggregatorsMap appAggregatorMap =
+        Records.newRecord(AppAggregatorsMap.class);
+    appAggregatorMap.setApplicationId(id);
+    appAggregatorMap.setAggregatorAddr(aggregatorAddr);
+    return appAggregatorMap;
+  }
+  
+  public abstract ApplicationId getApplicationId();
+  
+  public abstract void setApplicationId(
+      ApplicationId id);
+  
+  public abstract String getAggregatorAddr();
+  
+  public abstract void setAggregatorAddr(
+      String addr);
+
+}

+ 151 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java

@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AppAggregatorsMapPBImpl extends AppAggregatorsMap {
+
+  AppAggregatorsMapProto proto = 
+      AppAggregatorsMapProto.getDefaultInstance();
+  
+  AppAggregatorsMapProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationId appId = null;
+  private String aggregatorAddr = null;
+  
+  public AppAggregatorsMapPBImpl() {
+    builder = AppAggregatorsMapProto.newBuilder();
+  }
+
+  public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public AppAggregatorsMapProto getProto() {
+      mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+  
+  @Override
+  public ApplicationId getApplicationId() {
+    AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.appId == null && p.hasAppId()) {
+      this.appId = convertFromProtoFormat(p.getAppId());
+    }
+    return this.appId;
+  }
+  
+  @Override
+  public String getAggregatorAddr() {
+    AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.aggregatorAddr == null 
+        && p.hasAppAggregatorAddr()) {
+      this.aggregatorAddr = p.getAppAggregatorAddr();
+    }
+    return this.aggregatorAddr;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId appId) {
+    maybeInitBuilder();
+    if (appId == null) {
+      builder.clearAppId();
+    }
+    this.appId = appId;
+  }
+  
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    maybeInitBuilder();
+    if (aggregatorAddr == null) {
+      builder.clearAppAggregatorAddr();
+    }
+    this.aggregatorAddr = aggregatorAddr;
+  }
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+  
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+  
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = AppAggregatorsMapProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+  
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+  
+  private void mergeLocalToBuilder() {
+    if (this.appId != null) {
+      builder.setAppId(convertToProtoFormat(this.appId));
+    }
+    if (this.aggregatorAddr != null) {
+      builder.setAppAggregatorAddr(this.aggregatorAddr);
+    }
+  }
+
+}

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "AggregatorNodemanagerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service AggregatorNodemanagerProtocolService {
+  rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto);
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -63,6 +63,7 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional NodeLabelsProto nodeLabels = 4;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
+  repeated AppAggregatorsMapProto registered_aggregators = 6;
 }
 
 message LogAggregationReportProto {
@@ -85,6 +86,7 @@ message NodeHeartbeatResponseProto {
   optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
   repeated ContainerProto containers_to_decrease = 12;
   repeated SignalContainerRequestProto containers_to_signal = 13;
+  repeated AppAggregatorsMapProto app_aggregators_map = 14;
 }
 
 message SystemCredentialsForAppsProto {
@@ -92,6 +94,25 @@ message SystemCredentialsForAppsProto {
   optional bytes credentialsForApp = 2;
 }
 
+////////////////////////////////////////////////////////////////////////
+////// From aggregator_nodemanager_protocol ////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message AppAggregatorsMapProto {
+  optional ApplicationIdProto appId = 1;
+  optional string appAggregatorAddr = 2;
+}
+
+//////////////////////////////////////////////////////
+/////// aggregator_nodemanager_protocol //////////////
+//////////////////////////////////////////////////////
+message ReportNewAggregatorsInfoRequestProto {
+  repeated AppAggregatorsMapProto app_aggregators = 1;
+}
+
+message ReportNewAggregatorsInfoResponseProto {
+}
+
+
 message NMContainerStatusProto {
   optional ContainerIdProto container_id = 1;
   optional ContainerStateProto container_state = 2;

+ 100 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -61,6 +61,10 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -71,6 +75,14 @@ public class TestRPC {
   private static final String EXCEPTION_CAUSE = "exception cause";
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
+  public static final String ILLEGAL_NUMBER_MESSAGE = 
+      "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE.";
+  
+  public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0";
+  
+  public static final ApplicationId DEFAULT_APP_ID = 
+      ApplicationId.newInstance(0, 0);
+  
   @Test
   public void testUnknownCall() {
     Configuration conf = new Configuration();
@@ -98,8 +110,66 @@ public class TestRPC {
               + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
     } catch (Exception e) {
       e.printStackTrace();
+    } finally {
+      server.stop();
     }
   }
+  
+  @Test
+  public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(AggregatorNodemanagerProtocol.class,
+        new DummyNMAggregatorService(), addr, conf, null, 1);
+    server.start();
+
+    // Test unrelated protocol wouldn't get response
+    ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
+
+    try {
+      unknownProxy.getNewApplication(Records
+          .newRecord(GetNewApplicationRequest.class));
+      Assert.fail("Excepted RPC call to fail with unknown method.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().matches(
+          "Unknown method getNewApplication called on.*"
+              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+              + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    
+    // Test AggregatorNodemanagerProtocol get proper response
+    AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy(
+        AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
+    // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get 
+    // normally response.
+    try {
+      ReportNewAggregatorsInfoRequest request = 
+          ReportNewAggregatorsInfoRequest.newInstance(
+              DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR);
+      proxy.reportNewAggregatorInfo(request);
+    } catch (YarnException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+    
+    // Verify empty request get YarnException back (by design in 
+    // DummyNMAggregatorService)
+    try {
+      proxy.reportNewAggregatorInfo(Records
+          .newRecord(ReportNewAggregatorsInfoRequest.class));
+      Assert.fail("Excepted RPC call to fail with YarnException.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
+    }
+    
+    server.stop();
+  }
 
   @Test
   public void testHadoopProtoRPC() throws Exception {
@@ -167,10 +237,10 @@ public class TestRPC {
       System.out.println("Test Exception is " + e.getMessage());
     } catch (Exception ex) {
       ex.printStackTrace();
+    } finally {
+      server.stop();
     }
     Assert.assertTrue(exception);
-    
-    server.stop();
     Assert.assertNotNull(statuses.get(0));
     Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
   }
@@ -252,4 +322,32 @@ public class TestRPC {
             .buildTokenService(addr).toString());
     return containerToken;
   }
+  
+  // A dummy implementation for AggregatorNodemanagerProtocol for test purpose, 
+  // it only can accept one appID, aggregatorAddr pair or throw exceptions
+  public class DummyNMAggregatorService 
+      implements AggregatorNodemanagerProtocol {
+    
+    @Override
+    public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+        ReportNewAggregatorsInfoRequest request)
+        throws YarnException, IOException {
+      List<AppAggregatorsMap> appAggregators = request.getAppAggregatorsList();
+      if (appAggregators.size() == 1) {
+        // check default appID and aggregatorAddr
+        AppAggregatorsMap appAggregator = appAggregators.get(0);
+        Assert.assertEquals(appAggregator.getApplicationId(), 
+            DEFAULT_APP_ID);
+        Assert.assertEquals(appAggregator.getAggregatorAddr(), 
+            DEFAULT_AGGREGATOR_ADDR);
+      } else {
+        throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
+      }
+      
+      ReportNewAggregatorsInfoResponse response =
+          recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class);
+      return response;
+    }
+  }
+  
 }

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java

@@ -25,7 +25,9 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -107,11 +109,14 @@ public class TestYarnServerApiClasses {
     original.setLastKnownNMTokenMasterKey(getMasterKey());
     original.setNodeStatus(getNodeStatus());
     original.setNodeLabels(getValidNodeLabels());
+    Map<ApplicationId, String> aggregators = getAggregators();
+    original.setRegisteredAggregators(aggregators);
     NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
         original.getProto());
     assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
     assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
+    assertEquals(aggregators, copy.getRegisteredAggregators());
     // check labels are coming with valid values
     Assert.assertTrue(original.getNodeLabels()
         .containsAll(copy.getNodeLabels()));
@@ -148,6 +153,8 @@ public class TestYarnServerApiClasses {
     original.setNextHeartBeatInterval(1000);
     original.setNodeAction(NodeAction.NORMAL);
     original.setResponseId(100);
+    Map<ApplicationId, String> aggregators = getAggregators();
+    original.setAppAggregatorsMap(aggregators);
 
     NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
         original.getProto());
@@ -157,6 +164,7 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+    assertEquals(aggregators, copy.getAppAggregatorsMap());
     assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
    }
 
@@ -336,6 +344,15 @@ public class TestYarnServerApiClasses {
     return nodeLabels;
   }
 
+  private Map<ApplicationId, String> getAggregators() {
+    ApplicationId appID = ApplicationId.newInstance(1L, 1);
+    String aggregatorAddr = "localhost:0";
+    Map<ApplicationId, String> aggregatorMap = 
+        new HashMap<ApplicationId, String>();
+    aggregatorMap.put(appID, aggregatorAddr);
+    return aggregatorMap;
+  }
+  
   private ContainerStatus getContainerStatus(int applicationId,
       int containerID, int appAttemptId) {
     ContainerStatus status = recordFactory

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -59,6 +59,19 @@ public interface Context {
   ConcurrentMap<ApplicationId, Application> getApplications();
 
   Map<ApplicationId, Credentials> getSystemCredentialsForApps();
+  
+  /**
+   * Get the registered aggregators that located on this NM. 
+   * @return registered
+   */
+  Map<ApplicationId, String> getRegisteredAggregators();
+  
+  /**
+   * Return the known aggregators which get from RM for all active applications
+   * running on this NM.
+   * @return known aggregators.
+   */
+  Map<ApplicationId, String> getKnownAggregators();
 
   ConcurrentMap<ContainerId, Container> getContainers();
 

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -95,6 +96,7 @@ public class NodeManager extends CompositeService
   private Context context;
   private AsyncDispatcher dispatcher;
   private ContainerManagerImpl containerManager;
+  private NMAggregatorService nmAggregatorService;
   private NodeStatusUpdater nodeStatusUpdater;
   private NodeResourceMonitor nodeResourceMonitor;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook;
@@ -173,6 +175,10 @@ public class NodeManager extends CompositeService
     return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
       metrics, dirsHandler);
   }
+  
+  protected NMAggregatorService createNMAggregatorService(Context context) {
+    return new NMAggregatorService(context);
+  }
 
   protected WebServer createWebServer(Context nmContext,
       ResourceView resourceView, ApplicationACLsManager aclsManager,
@@ -349,6 +355,9 @@ public class NodeManager extends CompositeService
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
     DefaultMetricsSystem.initialize("NodeManager");
+    
+    this.nmAggregatorService = createNMAggregatorService(context);
+    addService(nmAggregatorService);
 
     // StatusUpdater should be added last so that it get started last 
     // so that we make sure everything is up before registering with RM. 
@@ -440,6 +449,12 @@ public class NodeManager extends CompositeService
 
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
+    
+    protected Map<ApplicationId, String> registeredAggregators =
+        new ConcurrentHashMap<ApplicationId, String>();
+    
+    protected Map<ApplicationId, String> knownAggregators =
+        new ConcurrentHashMap<ApplicationId, String>();
 
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@@ -585,6 +600,29 @@ public class NodeManager extends CompositeService
         getLogAggregationStatusForApps() {
       return this.logAggregationReportForApps;
     }
+    
+    @Override
+    public Map<ApplicationId, String> getRegisteredAggregators() {
+      return this.registeredAggregators;
+    }
+
+    public void addRegisteredAggregators(
+        Map<ApplicationId, String> newRegisteredAggregators) {
+      this.registeredAggregators.putAll(newRegisteredAggregators);
+      // Update to knownAggregators as well so it can immediately be consumed by 
+      // this NM's TimelineClient.
+      this.knownAggregators.putAll(newRegisteredAggregators);
+    }
+    
+    @Override
+    public Map<ApplicationId, String> getKnownAggregators() {
+      return this.knownAggregators;
+    }
+
+    public void addKnownAggregators(
+        Map<ApplicationId, String> knownAggregators) {
+      this.knownAggregators.putAll(knownAggregators);
+    }
   }
 
 
@@ -661,6 +699,11 @@ public class NodeManager extends CompositeService
   public Context getNMContext() {
     return this.context;
   }
+  
+  // For testing
+  NMAggregatorService getNMAggregatorService() {
+    return this.nmAggregatorService;
+  }
 
   public static void main(String[] args) throws IOException {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -725,7 +725,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                         .getContainerTokenSecretManager().getCurrentKey(),
                     NodeStatusUpdaterImpl.this.context
                         .getNMTokenSecretManager().getCurrentKey(),
-                    nodeLabelsForHeartbeat);
+                    nodeLabelsForHeartbeat,
+                    NodeStatusUpdaterImpl.this.context.getRegisteredAggregators());
 
             if (logAggregationEnabled) {
               // pull log aggregation status for application running in this NM
@@ -820,6 +821,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               dispatcher.getEventHandler().handle(
                   new CMgrSignalContainersEvent(containersToSignal));
             }
+
+            Map<ApplicationId, String> knownAggregators = response.getAppAggregatorsMap();
+            ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators);
+
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(

+ 113 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+
+public class NMAggregatorService extends CompositeService implements 
+    AggregatorNodemanagerProtocol {
+
+  private static final Log LOG = LogFactory.getLog(NMAggregatorService.class);
+
+  final Context context;
+  
+  private Server server;
+
+  public NMAggregatorService(Context context) {
+    
+    super(NMAggregatorService.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    
+    InetSocketAddress aggregatorServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+
+    // TODO Security settings.
+    YarnRPC rpc = YarnRPC.create(conf);
+
+    server =
+        rpc.getServer(AggregatorNodemanagerProtocol.class, this, 
+            aggregatorServerAddress, serverConf,
+            this.context.getNMTokenSecretManager(),
+            conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT, 
+                YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT));
+
+    server.start();
+    // start remaining services
+    super.serviceStart();
+    LOG.info("NMAggregatorService started at " + aggregatorServerAddress);
+  }
+
+  
+  @Override
+  public void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+    // TODO may cleanup app aggregators running on this NM in future.
+    super.serviceStop();
+  }
+
+  @Override
+  public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+      ReportNewAggregatorsInfoRequest request) throws IOException {
+    List<AppAggregatorsMap> newAggregatorsList = request.getAppAggregatorsList();
+    if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) {
+      Map<ApplicationId, String> newAggregatorsMap = 
+          new HashMap<ApplicationId, String>();
+      for (AppAggregatorsMap aggregator : newAggregatorsList) {
+        newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr());
+      }
+      ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap);
+    }
+    
+    return ReportNewAggregatorsInfoResponse.newInstance();
+  }
+  
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -424,6 +424,10 @@ public class ApplicationImpl implements Application {
           new LogHandlerAppFinishedEvent(app.appId));
 
       app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+      // Remove aggregator info for finished apps.
+      // TODO check we remove related aggregators info in failure cases (YARN-3038)
+      app.context.getRegisteredAggregators().remove(app.getAppId());
+      app.context.getKnownAggregators().remove(app.getAppId());
     }
   }
 

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -614,6 +614,16 @@ public abstract class BaseAMRMProxyTest {
       return null;
     }
 
+    @Override
+    public Map<ApplicationId, String> getRegisteredAggregators() {
+      return null;
+    }
+
+    @Override
+    public Map<ApplicationId, String> getKnownAggregators() {
+      return null;
+    }
+
     @Override
     public ConcurrentMap<ContainerId, Container> getContainers() {
       return null;

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -293,6 +293,8 @@ public class ApplicationMasterService extends AbstractService implements
 
     RMApp rmApp =
         rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+    // Remove aggregator address when app get finished.
+    rmApp.removeAggregatorAddr();
     // checking whether the app exits in RMStateStore at first not to throw
     // ApplicationDoesNotExistInCacheException before and after
     // RM work-preserving restart.
@@ -558,6 +560,10 @@ public class ApplicationMasterService extends AbstractService implements
       allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers());
 
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+      
+      // add aggregator address for this application
+      allocateResponse.setAggregatorAddr(
+          this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
 
       // add preemption to the allocateResponse message (if any)
       allocateResponse

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -22,8 +22,10 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -446,6 +449,11 @@ public class ResourceTrackerService extends AbstractService implements
       return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
           message);
     }
+    
+    // Check & update aggregators info from request.
+    // TODO make sure it won't have race condition issue for AM failed over case
+    // that the older registration could possible override the newer one.
+    updateAppAggregatorsMap(request);
 
     // Heartbeat response
     NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -463,6 +471,14 @@ public class ResourceTrackerService extends AbstractService implements
     if (!systemCredentials.isEmpty()) {
       nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
     }
+    
+    // Return aggregators' map that NM needs to know
+    // TODO we should optimize this to only include aggreator info that NM 
+    // doesn't know yet.
+    List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+    if (keepAliveApps != null) {
+      setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+    }
 
     // 4. Send status to RMNode, saving the latest response.
     RMNodeStatusEvent nodeStatusEvent =
@@ -490,6 +506,55 @@ public class ResourceTrackerService extends AbstractService implements
 
     return nodeHeartBeatResponse;
   }
+  
+  private void setAppAggregatorsMapToResponse(
+      List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
+    Map<ApplicationId, String> liveAppAggregatorsMap = new 
+        ConcurrentHashMap<ApplicationId, String>();
+    Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
+      for (ApplicationId appId : liveApps) {
+        String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
+        if (appAggregatorAddr != null) {
+          liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+        } else {
+          // Log a debug info if aggregator address is not found.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+          }
+        }
+      }
+    response.setAppAggregatorsMap(liveAppAggregatorsMap);
+  }
+  
+  private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
+    Map<ApplicationId, String> registeredAggregatorsMap = 
+        request.getRegisteredAggregators();
+    if (registeredAggregatorsMap != null 
+        && !registeredAggregatorsMap.isEmpty()) {
+      Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
+      for (Map.Entry<ApplicationId, String> entry: 
+          registeredAggregatorsMap.entrySet()) {
+        ApplicationId appId = entry.getKey();
+        String aggregatorAddr = entry.getValue();
+        if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+          RMApp rmApp = rmApps.get(appId);
+          if (rmApp == null) {
+            LOG.warn("Cannot update aggregator info because application ID: " + 
+                appId + " is not found in RMContext!");
+          } else {
+            String previousAggregatorAddr = rmApp.getAggregatorAddr();
+            if (previousAggregatorAddr == null || 
+                previousAggregatorAddr != aggregatorAddr) {
+              // sending aggregator update event.
+              RMAppAggregatorUpdateEvent event =
+                  new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+              rmContext.getDispatcher().getEventHandler().handle(event);
+            }
+          }
+        }
+      }
+    }
+  }
 
   /**
    * Check if node in decommissioning state.

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -175,6 +175,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return the tracking url for the application master.
    */
   String getTrackingUrl();
+  
+  /**
+   * The aggregator address for the application.
+   * @return the address for the application's aggregator.
+   */
+  String getAggregatorAddr();
+  
+  /**
+   * Set aggregator address for the application
+   * @param aggregatorAddr the address of aggregator
+   */
+  void setAggregatorAddr(String aggregatorAddr);
+  
+  /**
+   * Remove aggregator address when application is finished or killed.
+   */
+  void removeAggregatorAddr();
 
   /**
    * The original tracking url for the application master.

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppAggregatorUpdateEvent extends RMAppEvent {
+
+  private final String appAggregatorAddr;
+  
+  public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
+    super(appId, RMAppEventType.AGGREGATOR_UPDATE);
+    this.appAggregatorAddr = appAggregatorAddr;
+  }
+  
+  public String getAppAggregatorAddr(){
+    return this.appAggregatorAddr;
+  }
+  
+}

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java

@@ -30,6 +30,9 @@ public enum RMAppEventType {
 
   // Source: Scheduler
   APP_ACCEPTED,
+  
+  // TODO add source later
+  AGGREGATOR_UPDATE,
 
   // Source: RMAppAttempt
   ATTEMPT_REGISTERED,

+ 50 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -151,6 +151,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long storedFinishTime = 0;
   private int firstAttemptIdInStateStore = 1;
   private int nextAttemptId = 1;
+  private String aggregatorAddr;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -198,6 +199,8 @@ public class RMAppImpl implements RMApp, Recoverable {
      // Transitions from NEW state
     .addTransition(RMAppState.NEW, RMAppState.NEW,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW, RMAppState.NEW,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -214,6 +217,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     // Transitions from NEW_SAVING state
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
         RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -232,6 +237,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.APP_REJECTED,
         new FinalSavingTransition(
@@ -248,6 +255,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
@@ -274,6 +283,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_UNREGISTERED,
         new FinalSavingTransition(
@@ -303,6 +314,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -314,6 +327,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -325,6 +340,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -576,6 +593,21 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void setQueue(String queue) {
     this.queue = queue;
   }
+  
+  @Override
+  public String getAggregatorAddr() {
+    return this.aggregatorAddr;
+  }
+  
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    this.aggregatorAddr = aggregatorAddr;
+  }
+  
+  @Override
+  public void removeAggregatorAddr() {
+    this.aggregatorAddr = null;
+  }
 
   @Override
   public String getName() {
@@ -848,6 +880,8 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     // send the ATS create Event
     sendATSCreateEvent(this, this.startTime);
+    //TODO recover aggregator address.
+    //this.aggregatorAddr = appState.getAggregatorAddr();
 
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
@@ -902,9 +936,24 @@ public class RMAppImpl implements RMApp, Recoverable {
       SingleArcTransition<RMAppImpl, RMAppEvent> {
     public void transition(RMAppImpl app, RMAppEvent event) {
     };
-
   }
 
+  private static final class RMAppAggregatorUpdateTransition 
+      extends RMAppTransition {
+  
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      LOG.info("Updating aggregator info for app: " + app.getApplicationId());
+    
+      RMAppAggregatorUpdateEvent appAggregatorUpdateEvent = 
+          (RMAppAggregatorUpdateEvent) event;
+      // Update aggregator address
+      app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
+      
+      // TODO persistent to RMStateStore for recover
+      // Save to RMStateStore
+    };
+  }
+  
   private static final class RMAppNodeUpdateTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -95,6 +95,18 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
+    public String getAggregatorAddr() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
+    public void setAggregatorAddr(String aggregatorAddr) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
+    public void removeAggregatorAddr() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
     public ApplicationId getApplicationId() {
       throw new UnsupportedOperationException("Not supported yet.");
     }

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -301,4 +301,18 @@ public class MockRMApp implements RMApp {
   public CallerContext getCallerContext() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
+
+  public String getAggregatorAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+  
+  @Override
+  public void removeAggregatorAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java

@@ -94,10 +94,9 @@ public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
    * @return whether it was added successfully
    */
   public boolean addApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
     AppLevelTimelineAggregator aggregator =
-        new AppLevelTimelineAggregator(appIdString);
-    return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
+        new AppLevelTimelineAggregator(appId.toString());
+    return (aggregatorCollection.putIfAbsent(appId, aggregator)
         == aggregator);
   }
 

+ 73 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,9 +32,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -62,6 +70,12 @@ public class TimelineAggregatorsCollection extends CompositeService {
 
   // REST server for this aggregator collection
   private HttpServer2 timelineRestServer;
+  
+  private String timelineRestServerBindAddress;
+  
+  private AggregatorNodemanagerProtocol nmAggregatorService;
+  
+  private InetSocketAddress nmAggregatorServiceAddress;
 
   static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
 
@@ -73,6 +87,16 @@ public class TimelineAggregatorsCollection extends CompositeService {
     super(TimelineAggregatorsCollection.class.getName());
   }
 
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.nmAggregatorServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+    
+  }
+  
   @Override
   protected void serviceStart() throws Exception {
     startWebApp();
@@ -95,9 +119,13 @@ public class TimelineAggregatorsCollection extends CompositeService {
    * starting the app level service
    * @return the aggregator associated with id after the potential put.
    */
-  public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
+  public TimelineAggregator putIfAbsent(ApplicationId appId, 
+      TimelineAggregator aggregator) {
+    String id = appId.toString();
+    TimelineAggregator aggregatorInTable;
+    boolean aggregatorIsNew = false;
     synchronized (aggregators) {
-      TimelineAggregator aggregatorInTable = aggregators.get(id);
+      aggregatorInTable = aggregators.get(id);
       if (aggregatorInTable == null) {
         try {
           // initialize, start, and add it to the collection so it can be
@@ -106,16 +134,30 @@ public class TimelineAggregatorsCollection extends CompositeService {
           aggregator.start();
           aggregators.put(id, aggregator);
           LOG.info("the aggregator for " + id + " was added");
-          return aggregator;
+          aggregatorInTable = aggregator;
+          aggregatorIsNew = true;
         } catch (Exception e) {
           throw new YarnRuntimeException(e);
         }
       } else {
         String msg = "the aggregator for " + id + " already exists!";
         LOG.error(msg);
-        return aggregatorInTable;
+      }
+      
+    }
+    // Report to NM if a new aggregator is added.
+    if (aggregatorIsNew) {
+      try {
+        reportNewAggregatorToNM(appId);
+      } catch (Exception e) {
+        // throw exception here as it cannot be used if failed report to NM
+        LOG.error("Failed to report a new aggregator for application: " + appId + 
+            " to NM Aggregator Services.");
+        throw new YarnRuntimeException(e);
       }
     }
+    
+    return aggregatorInTable;
   }
 
   /**
@@ -167,7 +209,10 @@ public class TimelineAggregatorsCollection extends CompositeService {
     String bindAddress = WebAppUtils.getWebAppBindURL(conf,
         YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
         WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
-    LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+        NetUtils.createSocketAddr(bindAddress));
+    LOG.info("Instantiating the per-node aggregator webapp at " + 
+        timelineRestServerBindAddress);
     try {
       Configuration confForInfoServer = new Configuration(conf);
       confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
@@ -200,4 +245,27 @@ public class TimelineAggregatorsCollection extends CompositeService {
       throw new YarnRuntimeException(msg, e);
     }
   }
+  
+  private void reportNewAggregatorToNM(ApplicationId appId) 
+      throws YarnException, IOException {
+    this.nmAggregatorService = getNMAggregatorService();
+    ReportNewAggregatorsInfoRequest request = 
+        ReportNewAggregatorsInfoRequest.newInstance(appId,
+            this.timelineRestServerBindAddress);
+    LOG.info("Report a new aggregator for application: " + appId + 
+        " to NM Aggregator Services.");
+    nmAggregatorService.reportNewAggregatorInfo(request);
+  }
+  
+  // protected for test
+  protected AggregatorNodemanagerProtocol getNMAggregatorService(){
+    Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+    
+    // TODO Security settings.
+    return (AggregatorNodemanagerProtocol) rpc.getProxy(
+        AggregatorNodemanagerProtocol.class,
+        nmAggregatorServiceAddress, conf);
+  }
+  
 }

+ 6 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java

@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
 
 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.junit.Test;
 
 public class TestTimelineAggregatorsCollection {
@@ -45,11 +46,11 @@ public class TestTimelineAggregatorsCollection {
     final int NUM_APPS = 5;
     List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
     for (int i = 0; i < NUM_APPS; i++) {
-      final String appId = String.valueOf(i);
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineAggregator aggregator =
-              new AppLevelTimelineAggregator(appId);
+              new AppLevelTimelineAggregator(appId.toString());
           return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
         }
       };
@@ -79,14 +80,14 @@ public class TestTimelineAggregatorsCollection {
     final int NUM_APPS = 5;
     List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
     for (int i = 0; i < NUM_APPS; i++) {
-      final String appId = String.valueOf(i);
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineAggregator aggregator =
-              new AppLevelTimelineAggregator(appId);
+              new AppLevelTimelineAggregator(appId.toString());
           boolean successPut =
               (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
-          return successPut && aggregatorCollection.remove(appId);
+          return successPut && aggregatorCollection.remove(appId.toString());
         }
       };
       tasks.add(task);