فهرست منبع

YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu.

Sangjin Lee 8 سال پیش
والد
کامیت
78b7e070d8
28فایلهای تغییر یافته به همراه481 افزوده شده و 294 حذف شده
  1. 8 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
  2. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  3. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
  4. 25 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
  5. 22 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  6. 18 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
  7. 104 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
  8. 0 46
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
  9. 62 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
  10. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java
  11. 8 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  12. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  13. 12 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
  14. 11 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  15. 11 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  16. 32 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  17. 18 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
  18. 22 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  19. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  20. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  21. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
  22. 37 21
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  23. 11 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  24. 0 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
  25. 8 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  26. 24 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  27. 2 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  28. 2 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

+ 8 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java

@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -47,7 +48,7 @@ public abstract class NodeHeartbeatRequest {
   public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
       MasterKey lastKnownContainerTokenMasterKey,
       MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
-      Map<ApplicationId, String> registeredCollectors) {
+      Map<ApplicationId, AppCollectorData> registeringCollectors) {
     NodeHeartbeatRequest nodeHeartbeatRequest =
         Records.newRecord(NodeHeartbeatRequest.class);
     nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@@ -56,7 +57,7 @@ public abstract class NodeHeartbeatRequest {
     nodeHeartbeatRequest
         .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
     nodeHeartbeatRequest.setNodeLabels(nodeLabels);
-    nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors);
+    nodeHeartbeatRequest.setRegisteringCollectors(registeringCollectors);
     return nodeHeartbeatRequest;
   }
 
@@ -79,7 +80,9 @@ public abstract class NodeHeartbeatRequest {
       List<LogAggregationReport> logAggregationReportsForApps);
 
   // This tells RM registered collectors' address info on this node
-  public abstract Map<ApplicationId, String> getRegisteredCollectors();
-  public abstract void setRegisteredCollectors(Map<ApplicationId,
-      String> appCollectorsMap);
+  public abstract Map<ApplicationId, AppCollectorData>
+      getRegisteringCollectors();
+
+  public abstract void setRegisteringCollectors(Map<ApplicationId,
+      AppCollectorData> appCollectorsMap);
 }

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -47,10 +48,9 @@ public abstract class NodeHeartbeatResponse {
   public abstract List<ApplicationId> getApplicationsToCleanup();
 
   // This tells NM the collectors' address info of related apps
-  public abstract Map<ApplicationId, String> getAppCollectorsMap();
-
-  public abstract void setAppCollectorsMap(
-      Map<ApplicationId, String> appCollectorsMap);
+  public abstract Map<ApplicationId, AppCollectorData> getAppCollectors();
+  public abstract void setAppCollectors(
+      Map<ApplicationId, AppCollectorData> appCollectorsMap);
 
   public abstract void setResponseId(int responseId);
 

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java

@@ -22,14 +22,14 @@ import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.util.Records;
 
 @Private
 public abstract class ReportNewCollectorInfoRequest {
 
   public static ReportNewCollectorInfoRequest newInstance(
-      List<AppCollectorsMap> appCollectorsList) {
+      List<AppCollectorData> appCollectorsList) {
     ReportNewCollectorInfoRequest request =
         Records.newRecord(ReportNewCollectorInfoRequest.class);
     request.setAppCollectorsList(appCollectorsList);
@@ -41,13 +41,13 @@ public abstract class ReportNewCollectorInfoRequest {
     ReportNewCollectorInfoRequest request =
         Records.newRecord(ReportNewCollectorInfoRequest.class);
     request.setAppCollectorsList(
-        Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr)));
+        Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
     return request;
   }
 
-  public abstract List<AppCollectorsMap> getAppCollectorsList();
+  public abstract List<AppCollectorData> getAppCollectorsList();
 
   public abstract void setAppCollectorsList(
-      List<AppCollectorsMap> appCollectorsList);
+      List<AppCollectorData> appCollectorsList);
 
 }

+ 25 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java

@@ -35,13 +35,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -58,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private Set<NodeLabel> labels = null;
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
-  private Map<ApplicationId, String> registeredCollectors = null;
+  private Map<ApplicationId, AppCollectorData> registeringCollectors = null;
 
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
@@ -114,8 +115,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
-    if (this.registeredCollectors != null) {
-      addRegisteredCollectorsToProto();
+    if (this.registeringCollectors != null) {
+      addRegisteringCollectorsToProto();
     }
   }
 
@@ -158,14 +159,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     return ((LogAggregationReportPBImpl) value).getProto();
   }
 
-  private void addRegisteredCollectorsToProto() {
+  private void addRegisteringCollectorsToProto() {
     maybeInitBuilder();
-    builder.clearRegisteredCollectors();
-    for (Map.Entry<ApplicationId, String> entry :
-        registeredCollectors.entrySet()) {
-      builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder()
+    builder.clearRegisteringCollectors();
+    for (Map.Entry<ApplicationId, AppCollectorData> entry :
+        registeringCollectors.entrySet()) {
+      builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
-          .setAppCollectorAddr(entry.getValue()));
+          .setAppCollectorAddr(entry.getValue().getCollectorAddr()));
     }
   }
 
@@ -251,35 +252,37 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   }
 
   @Override
-  public Map<ApplicationId, String> getRegisteredCollectors() {
-    if (this.registeredCollectors != null) {
-      return this.registeredCollectors;
+  public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
+    if (this.registeringCollectors != null) {
+      return this.registeringCollectors;
     }
     initRegisteredCollectors();
-    return registeredCollectors;
+    return registeringCollectors;
   }
 
   private void initRegisteredCollectors() {
     NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
-    List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
+    List<AppCollectorDataProto> list = p.getRegisteringCollectorsList();
     if (!list.isEmpty()) {
-      this.registeredCollectors = new HashMap<>();
-      for (AppCollectorsMapProto c : list) {
+      this.registeringCollectors = new HashMap<>();
+      for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
-        this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+        AppCollectorData data = AppCollectorData.newInstance(appId,
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+        this.registeringCollectors.put(appId, data);
       }
     }
   }
 
   @Override
-  public void setRegisteredCollectors(
-      Map<ApplicationId, String> registeredCollectors) {
+  public void setRegisteringCollectors(
+      Map<ApplicationId, AppCollectorData> registeredCollectors) {
     if (registeredCollectors == null || registeredCollectors.isEmpty()) {
       return;
     }
     maybeInitBuilder();
-    this.registeredCollectors = new HashMap<ApplicationId, String>();
-    this.registeredCollectors.putAll(registeredCollectors);
+    this.registeringCollectors = new HashMap<>();
+    this.registeringCollectors.putAll(registeredCollectors);
   }
 
   private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {

+ 22 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -45,11 +45,12 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -70,7 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   private List<ApplicationId> applicationsToCleanup = null;
   private Map<ApplicationId, ByteBuffer> systemCredentials = null;
   private Resource resource = null;
-  private Map<ApplicationId, String> appCollectorsMap = null;
+  private Map<ApplicationId, AppCollectorData> appCollectorsMap = null;
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -146,11 +147,15 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
 
   private void addAppCollectorsMapToProto() {
     maybeInitBuilder();
-    builder.clearAppCollectorsMap();
-    for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) {
-      builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder()
+    builder.clearAppCollectors();
+    for (Map.Entry<ApplicationId, AppCollectorData> entry
+        : appCollectorsMap.entrySet()) {
+      AppCollectorData data = entry.getValue();
+      builder.addAppCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
-          .setAppCollectorAddr(entry.getValue()));
+          .setAppCollectorAddr(data.getCollectorAddr())
+          .setRmIdentifier(data.getRMIdentifier())
+          .setVersion(data.getVersion()));
     }
   }
 
@@ -568,7 +573,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   }
 
   @Override
-  public Map<ApplicationId, String> getAppCollectorsMap() {
+  public Map<ApplicationId, AppCollectorData> getAppCollectors() {
     if (this.appCollectorsMap != null) {
       return this.appCollectorsMap;
     }
@@ -589,12 +594,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
 
   private void initAppCollectorsMap() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
+    List<AppCollectorDataProto> list = p.getAppCollectorsList();
     if (!list.isEmpty()) {
       this.appCollectorsMap = new HashMap<>();
-      for (AppCollectorsMapProto c : list) {
+      for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
-        this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+        AppCollectorData data = AppCollectorData.newInstance(appId,
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+        this.appCollectorsMap.put(appId, data);
       }
     }
   }
@@ -611,14 +618,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   }
 
   @Override
-  public void setAppCollectorsMap(
-      Map<ApplicationId, String> appCollectorsMap) {
-    if (appCollectorsMap == null || appCollectorsMap.isEmpty()) {
+  public void setAppCollectors(
+      Map<ApplicationId, AppCollectorData> appCollectors) {
+    if (appCollectors == null || appCollectors.isEmpty()) {
       return;
     }
     maybeInitBuilder();
-    this.appCollectorsMap = new HashMap<ApplicationId, String>();
-    this.appCollectorsMap.putAll(appCollectorsMap);
+    this.appCollectorsMap = new HashMap<>();
+    this.appCollectorsMap.putAll(appCollectors);
   }
 
   @Override

+ 18 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java

@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
 
 public class ReportNewCollectorInfoRequestPBImpl extends
     ReportNewCollectorInfoRequest {
@@ -36,7 +36,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends
   private ReportNewCollectorInfoRequestProto.Builder builder = null;
   private boolean viaProto = false;
 
-  private List<AppCollectorsMap> collectorsList = null;
+  private List<AppCollectorData> collectorsList = null;
 
   public ReportNewCollectorInfoRequestPBImpl() {
     builder = ReportNewCollectorInfoRequestProto.newBuilder();
@@ -96,9 +96,9 @@ public class ReportNewCollectorInfoRequestPBImpl extends
   private void addLocalCollectorsToProto() {
     maybeInitBuilder();
     builder.clearAppCollectors();
-    List<AppCollectorsMapProto> protoList =
-        new ArrayList<AppCollectorsMapProto>();
-    for (AppCollectorsMap m : this.collectorsList) {
+    List<AppCollectorDataProto> protoList =
+        new ArrayList<AppCollectorDataProto>();
+    for (AppCollectorData m : this.collectorsList) {
       protoList.add(convertToProtoFormat(m));
     }
     builder.addAllAppCollectors(protoList);
@@ -106,16 +106,16 @@ public class ReportNewCollectorInfoRequestPBImpl extends
 
   private void initLocalCollectorsList() {
     ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
-    List<AppCollectorsMapProto> list =
+    List<AppCollectorDataProto> list =
         p.getAppCollectorsList();
-    this.collectorsList = new ArrayList<AppCollectorsMap>();
-    for (AppCollectorsMapProto m : list) {
+    this.collectorsList = new ArrayList<AppCollectorData>();
+    for (AppCollectorDataProto m : list) {
       this.collectorsList.add(convertFromProtoFormat(m));
     }
   }
 
   @Override
-  public List<AppCollectorsMap> getAppCollectorsList() {
+  public List<AppCollectorData> getAppCollectorsList() {
     if (this.collectorsList == null) {
       initLocalCollectorsList();
     }
@@ -123,7 +123,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends
   }
 
   @Override
-  public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) {
+  public void setAppCollectorsList(List<AppCollectorData> appCollectorsList) {
     maybeInitBuilder();
     if (appCollectorsList == null) {
       builder.clearAppCollectors();
@@ -131,14 +131,14 @@ public class ReportNewCollectorInfoRequestPBImpl extends
     this.collectorsList = appCollectorsList;
   }
 
-  private AppCollectorsMapPBImpl convertFromProtoFormat(
-      AppCollectorsMapProto p) {
-    return new AppCollectorsMapPBImpl(p);
+  private AppCollectorDataPBImpl convertFromProtoFormat(
+      AppCollectorDataProto p) {
+    return new AppCollectorDataPBImpl(p);
   }
 
-  private AppCollectorsMapProto convertToProtoFormat(
-      AppCollectorsMap m) {
-    return ((AppCollectorsMapPBImpl) m).getProto();
+  private AppCollectorDataProto convertToProtoFormat(
+      AppCollectorData m) {
+    return ((AppCollectorDataPBImpl) m).getProto();
   }
 
 }

+ 104 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+@InterfaceStability.Unstable
+public abstract class AppCollectorData {
+
+  protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+
+  public static AppCollectorData newInstance(
+      ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+    AppCollectorData appCollectorData =
+        Records.newRecord(AppCollectorData.class);
+    appCollectorData.setApplicationId(id);
+    appCollectorData.setCollectorAddr(collectorAddr);
+    appCollectorData.setRMIdentifier(rmIdentifier);
+    appCollectorData.setVersion(version);
+    return appCollectorData;
+  }
+
+  public static AppCollectorData newInstance(ApplicationId id,
+      String collectorAddr) {
+    return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
+        DEFAULT_TIMESTAMP_VALUE);
+  }
+
+  /**
+   * Returns if a collector data item happens before another one. Null data
+   * items happens before any other non-null items. Non-null data items A
+   * happens before another non-null item B when A's rmIdentifier is less than
+   * B's rmIdentifier. Or A's version is less than B's if they have the same
+   * rmIdentifier.
+   *
+   * @param dataA first collector data item.
+   * @param dataB second collector data item.
+   * @return true if dataA happens before dataB.
+   */
+  public static boolean happensBefore(AppCollectorData dataA,
+      AppCollectorData dataB) {
+    if (dataA == null && dataB == null) {
+      return false;
+    } else if (dataA == null || dataB == null) {
+      return dataA == null;
+    }
+
+    return
+        (dataA.getRMIdentifier() < dataB.getRMIdentifier())
+        || ((dataA.getRMIdentifier() == dataB.getRMIdentifier())
+            && (dataA.getVersion() < dataB.getVersion()));
+  }
+
+  /**
+   * Returns if the collector data has been stamped by the RM with a RM cluster
+   * timestamp and a version number.
+   *
+   * @return true if RM has already assigned a timestamp for this collector.
+   * Otherwise, it means the RM has not recognized the existence of this
+   * collector.
+   */
+  public boolean isStamped() {
+    return (getRMIdentifier() != DEFAULT_TIMESTAMP_VALUE)
+        || (getVersion() != DEFAULT_TIMESTAMP_VALUE);
+  }
+
+  public abstract ApplicationId getApplicationId();
+
+  public abstract void setApplicationId(ApplicationId id);
+
+  public abstract String getCollectorAddr();
+
+  public abstract void setCollectorAddr(String addr);
+
+  public abstract long getRMIdentifier();
+
+  public abstract void setRMIdentifier(long rmId);
+
+  public abstract long getVersion();
+
+  public abstract void setVersion(long version);
+
+}

+ 0 - 46
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java

@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.api.records;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.Records;
-
-
-@Private
-public abstract class AppCollectorsMap {
-
-  public static AppCollectorsMap newInstance(
-      ApplicationId id, String collectorAddr) {
-    AppCollectorsMap appCollectorsMap =
-        Records.newRecord(AppCollectorsMap.class);
-    appCollectorsMap.setApplicationId(id);
-    appCollectorsMap.setCollectorAddr(collectorAddr);
-    return appCollectorsMap;
-  }
-
-  public abstract ApplicationId getApplicationId();
-
-  public abstract void setApplicationId(ApplicationId id);
-
-  public abstract String getCollectorAddr();
-
-  public abstract void setCollectorAddr(String addr);
-
-}

+ 62 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java

@@ -21,37 +21,39 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
 
 import com.google.protobuf.TextFormat;
 
 @Private
 @Unstable
-public class AppCollectorsMapPBImpl extends AppCollectorsMap {
+public class AppCollectorDataPBImpl extends AppCollectorData {
 
-  private AppCollectorsMapProto proto =
-      AppCollectorsMapProto.getDefaultInstance();
+  private AppCollectorDataProto proto =
+      AppCollectorDataProto.getDefaultInstance();
 
-  private AppCollectorsMapProto.Builder builder = null;
+  private AppCollectorDataProto.Builder builder = null;
   private boolean viaProto = false;
 
   private ApplicationId appId = null;
   private String collectorAddr = null;
+  private Long rmIdentifier = null;
+  private Long version = null;
 
-  public AppCollectorsMapPBImpl() {
-    builder = AppCollectorsMapProto.newBuilder();
+  public AppCollectorDataPBImpl() {
+    builder = AppCollectorDataProto.newBuilder();
   }
 
-  public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) {
+  public AppCollectorDataPBImpl(AppCollectorDataProto proto) {
     this.proto = proto;
     viaProto = true;
   }
 
-  public AppCollectorsMapProto getProto() {
+  public AppCollectorDataProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
@@ -81,7 +83,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
 
   @Override
   public ApplicationId getApplicationId() {
-    AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
     if (this.appId == null && p.hasAppId()) {
       this.appId = convertFromProtoFormat(p.getAppId());
     }
@@ -90,7 +92,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
 
   @Override
   public String getCollectorAddr() {
-    AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
     if (this.collectorAddr == null
         && p.hasAppCollectorAddr()) {
       this.collectorAddr = p.getAppCollectorAddr();
@@ -116,6 +118,46 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
     this.collectorAddr = collectorAddr;
   }
 
+  @Override
+  public long getRMIdentifier() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.rmIdentifier == null && p.hasRmIdentifier()) {
+      this.rmIdentifier = p.getRmIdentifier();
+    }
+    if (this.rmIdentifier != null) {
+      return this.rmIdentifier;
+    } else {
+      return AppCollectorData.DEFAULT_TIMESTAMP_VALUE;
+    }
+  }
+
+  @Override
+  public void setRMIdentifier(long rmId) {
+    maybeInitBuilder();
+    this.rmIdentifier = rmId;
+    builder.setRmIdentifier(rmId);
+  }
+
+  @Override
+  public long getVersion() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.version == null && p.hasRmIdentifier()) {
+      this.version = p.getRmIdentifier();
+    }
+    if (this.version != null) {
+      return this.version;
+    } else {
+      return AppCollectorData.DEFAULT_TIMESTAMP_VALUE;
+    }
+  }
+
+  @Override
+  public void setVersion(long version) {
+    maybeInitBuilder();
+    this.version = version;
+    builder.setVersion(version);
+  }
+
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);
   }
@@ -126,7 +168,7 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
 
   private void maybeInitBuilder() {
     if (viaProto || builder == null) {
-      builder = AppCollectorsMapProto.newBuilder(proto);
+      builder = AppCollectorDataProto.newBuilder(proto);
     }
     viaProto = false;
   }
@@ -147,6 +189,12 @@ public class AppCollectorsMapPBImpl extends AppCollectorsMap {
     if (this.collectorAddr != null) {
       builder.setAppCollectorAddr(this.collectorAddr);
     }
+    if (this.rmIdentifier != null) {
+      builder.setRmIdentifier(this.rmIdentifier);
+    }
+    if (this.version != null) {
+      builder.setVersion(this.version);
+    }
   }
 
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java

@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Server records PB implementations. */
+package org.apache.hadoop.yarn.server.api.records.impl.pb;

+ 8 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -90,7 +90,7 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional NodeLabelsProto nodeLabels = 4;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
-  repeated AppCollectorsMapProto registered_collectors = 6;
+  repeated AppCollectorDataProto registering_collectors = 6;
 }
 
 message LogAggregationReportProto {
@@ -116,7 +116,7 @@ message NodeHeartbeatResponseProto {
   repeated SignalContainerRequestProto containers_to_signal = 13;
   optional ResourceProto resource = 14;
   optional ContainerQueuingLimitProto container_queuing_limit = 15;
-  repeated AppCollectorsMapProto app_collectors_map = 16;
+  repeated AppCollectorDataProto app_collectors = 16;
   // to be used in place of containers_to_decrease
   repeated ContainerProto containers_to_update = 17;
 }
@@ -134,16 +134,18 @@ message SystemCredentialsForAppsProto {
 ////////////////////////////////////////////////////////////////////////
 ////// From collector_nodemanager_protocol ////////////////////////////
 ////////////////////////////////////////////////////////////////////////
-message AppCollectorsMapProto {
-  optional ApplicationIdProto appId = 1;
-  optional string appCollectorAddr = 2;
+message AppCollectorDataProto {
+  optional ApplicationIdProto app_id = 1;
+  optional string app_collector_addr = 2;
+  optional int64 rm_identifier = 3 [default = -1];
+  optional int64 version = 4 [default = -1];
 }
 
 //////////////////////////////////////////////////////
 /////// collector_nodemanager_protocol //////////////
 //////////////////////////////////////////////////////
 message ReportNewCollectorInfoRequestProto {
-  repeated AppCollectorsMapProto app_collectors = 1;
+  repeated AppCollectorDataProto app_collectors = 1;
 }
 
 message ReportNewCollectorInfoResponseProto {

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -429,10 +429,10 @@ public class TestRPC {
     public ReportNewCollectorInfoResponse reportNewCollectorInfo(
         ReportNewCollectorInfoRequest request)
         throws YarnException, IOException {
-      List<AppCollectorsMap> appCollectors = request.getAppCollectorsList();
+      List<AppCollectorData> appCollectors = request.getAppCollectorsList();
       if (appCollectors.size() == 1) {
         // check default appID and collectorAddr
-        AppCollectorsMap appCollector = appCollectors.get(0);
+        AppCollectorData appCollector = appCollectors.get(0);
         Assert.assertEquals(appCollector.getApplicationId(),
             DEFAULT_APP_ID);
         Assert.assertEquals(appCollector.getCollectorAddr(),

+ 12 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -109,14 +110,14 @@ public class TestYarnServerApiClasses {
     original.setLastKnownNMTokenMasterKey(getMasterKey());
     original.setNodeStatus(getNodeStatus());
     original.setNodeLabels(getValidNodeLabels());
-    Map<ApplicationId, String> collectors = getCollectors();
-    original.setRegisteredCollectors(collectors);
+    Map<ApplicationId, AppCollectorData> collectors = getCollectors();
+    original.setRegisteringCollectors(collectors);
     NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
         original.getProto());
     assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
     assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
-    assertEquals(collectors, copy.getRegisteredCollectors());
+    assertEquals(collectors, copy.getRegisteringCollectors());
     // check labels are coming with valid values
     Assert.assertTrue(original.getNodeLabels()
         .containsAll(copy.getNodeLabels()));
@@ -153,8 +154,8 @@ public class TestYarnServerApiClasses {
     original.setNextHeartBeatInterval(1000);
     original.setNodeAction(NodeAction.NORMAL);
     original.setResponseId(100);
-    Map<ApplicationId, String> collectors = getCollectors();
-    original.setAppCollectorsMap(collectors);
+    Map<ApplicationId, AppCollectorData> collectors = getCollectors();
+    original.setAppCollectors(collectors);
 
     NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
         original.getProto());
@@ -164,7 +165,7 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
-    assertEquals(collectors, copy.getAppCollectorsMap());
+    assertEquals(collectors, copy.getAppCollectors());
     assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
    }
 
@@ -347,12 +348,13 @@ public class TestYarnServerApiClasses {
     return nodeLabels;
   }
 
-  private Map<ApplicationId, String> getCollectors() {
+  private Map<ApplicationId, AppCollectorData> getCollectors() {
     ApplicationId appID = ApplicationId.newInstance(1L, 1);
     String collectorAddr = "localhost:0";
-    Map<ApplicationId, String> collectorMap =
-        new HashMap<ApplicationId, String>();
-    collectorMap.put(appID, collectorAddr);
+    AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
+    Map<ApplicationId, AppCollectorData> collectorMap =
+        new HashMap<>();
+    collectorMap.put(appID, data);
     return collectorMap;
   }
 

+ 11 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -65,11 +66,18 @@ public interface Context {
   Map<ApplicationId, Credentials> getSystemCredentialsForApps();
 
   /**
-   * Get the registered collectors that located on this NM.
-   * @return registered collectors, or null if the timeline service v.2 is not
+   * Get the list of collectors that are registering with the RM from this node.
+   * @return registering collectors, or null if the timeline service v.2 is not
    * enabled
    */
-  Map<ApplicationId, String> getRegisteredCollectors();
+  Map<ApplicationId, AppCollectorData> getRegisteringCollectors();
+
+  /**
+   * Get the list of collectors registered with the RM and known by this node.
+   * @return known collectors, or null if the timeline service v.2 is not
+   * enabled.
+   */
+  Map<ApplicationId, AppCollectorData> getKnownCollectors();
 
   ConcurrentMap<ContainerId, Container> getContainers();
 

+ 11 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
@@ -492,7 +493,9 @@ public class NodeManager extends CompositeService
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
-    private Map<ApplicationId, String> registeredCollectors;
+    private Map<ApplicationId, AppCollectorData> registeringCollectors;
+
+    private Map<ApplicationId, AppCollectorData> knownCollectors;
 
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
@@ -526,7 +529,8 @@ public class NodeManager extends CompositeService
         NMStateStoreService stateStore, boolean isDistSchedulingEnabled,
         Configuration conf) {
       if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
-        this.registeredCollectors = new ConcurrentHashMap<>();
+        this.registeringCollectors = new ConcurrentHashMap<>();
+        this.knownCollectors = new ConcurrentHashMap<>();
       }
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
@@ -681,18 +685,13 @@ public class NodeManager extends CompositeService
     }
 
     @Override
-    public Map<ApplicationId, String> getRegisteredCollectors() {
-      return this.registeredCollectors;
+    public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
+      return this.registeringCollectors;
     }
 
-    public void addRegisteredCollectors(
-        Map<ApplicationId, String> newRegisteredCollectors) {
-      if (registeredCollectors != null) {
-        this.registeredCollectors.putAll(newRegisteredCollectors);
-      } else {
-        LOG.warn("collectors are added when the registered collectors are " +
-            "initialized");
-      }
+    @Override
+    public Map<ApplicationId, AppCollectorData> getKnownCollectors() {
+      return this.knownCollectors;
     }
 
     @Override

+ 32 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -760,7 +761,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   }
 
   protected void startStatusUpdater() {
-
     statusUpdaterRunnable = new StatusUpdaterRunnable();
     statusUpdater =
         new Thread(statusUpdaterRunnable, "Node Status Updater");
@@ -1043,7 +1043,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                       .getNMTokenSecretManager().getCurrentKey(),
                   nodeLabelsForHeartbeat,
                   NodeStatusUpdaterImpl.this.context
-                      .getRegisteredCollectors());
+                      .getRegisteringCollectors());
 
           if (logAggregationEnabled) {
             // pull log aggregation status for application running in this NM
@@ -1134,7 +1134,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             }
           }
           if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
-            updateTimelineClientsAddress(response);
+            updateTimelineCollectorData(response);
           }
 
         } catch (ConnectException e) {
@@ -1164,40 +1164,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       }
     }
 
-    private void updateTimelineClientsAddress(
+    private void updateTimelineCollectorData(
         NodeHeartbeatResponse response) {
-      Map<ApplicationId, String> knownCollectorsMap =
-          response.getAppCollectorsMap();
-      if (knownCollectorsMap == null) {
+      Map<ApplicationId, AppCollectorData> incomingCollectorsMap =
+          response.getAppCollectors();
+      if (incomingCollectorsMap == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("No collectors to update RM");
         }
-      } else {
-        Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
-            knownCollectorsMap.entrySet();
-        for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
-          ApplicationId appId = entry.getKey();
-          String collectorAddr = entry.getValue();
-
-          // Only handle applications running on local node.
-          // Not include apps with timeline collectors running in local
-          Application application = context.getApplications().get(appId);
-          // TODO this logic could be problematic if the collector address
-          // gets updated due to NM restart or collector service failure
-          if (application != null &&
-              !context.getRegisteredCollectors().containsKey(appId)) {
+        return;
+      }
+      Map<ApplicationId, AppCollectorData> knownCollectors =
+          context.getKnownCollectors();
+      for (Map.Entry<ApplicationId, AppCollectorData> entry
+          : incomingCollectorsMap.entrySet()) {
+        ApplicationId appId = entry.getKey();
+        AppCollectorData collectorData = entry.getValue();
+
+        // Only handle applications running on local node.
+        Application application = context.getApplications().get(appId);
+        if (application != null) {
+          // Update collector data if the newly received data happens after
+          // the known data (updates the known data).
+          AppCollectorData existingData = knownCollectors.get(appId);
+          if (AppCollectorData.happensBefore(existingData, collectorData)) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Sync a new collector address: " + collectorAddr +
-                      " for application: " + appId + " from RM.");
+              LOG.debug("Sync a new collector address: "
+                  + collectorData.getCollectorAddr()
+                  + " for application: " + appId + " from RM.");
             }
+            // Update information for clients.
             NMTimelinePublisher nmTimelinePublisher =
                 context.getNMTimelinePublisher();
             if (nmTimelinePublisher != null) {
               nmTimelinePublisher.setTimelineServiceAddress(
-                  application.getAppId(), collectorAddr);
+                  application.getAppId(), collectorData.getCollectorAddr());
             }
+            // Update information for the node manager itself.
+            knownCollectors.put(appId, collectorData);
           }
         }
+        // Remove the registering collector data
+        context.getRegisteringCollectors().remove(entry.getKey());
       }
     }
 

+ 18 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java

@@ -37,9 +37,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 
@@ -108,23 +107,31 @@ public class NMCollectorService extends CompositeService implements
   @Override
   public ReportNewCollectorInfoResponse reportNewCollectorInfo(
       ReportNewCollectorInfoRequest request) throws YarnException, IOException {
-    List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
+    List<AppCollectorData> newCollectorsList = request.getAppCollectorsList();
     if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
-      Map<ApplicationId, String> newCollectorsMap =
-          new HashMap<ApplicationId, String>();
-      for (AppCollectorsMap collector : newCollectorsList) {
+      Map<ApplicationId, AppCollectorData> newCollectorsMap =
+          new HashMap<>();
+      for (AppCollectorData collector : newCollectorsList) {
         ApplicationId appId = collector.getApplicationId();
-        String collectorAddr = collector.getCollectorAddr();
-        newCollectorsMap.put(appId, collectorAddr);
+        newCollectorsMap.put(appId, collector);
         // set registered collector address to TimelineClient.
+        // TODO: Do we need to do this after we received confirmation from
+        // the RM?
         NMTimelinePublisher nmTimelinePublisher =
             context.getNMTimelinePublisher();
         if (nmTimelinePublisher != null) {
-          nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr);
+          nmTimelinePublisher.setTimelineServiceAddress(appId,
+              collector.getCollectorAddr());
         }
       }
-      ((NodeManager.NMContext)context).addRegisteredCollectors(
-          newCollectorsMap);
+      Map<ApplicationId, AppCollectorData> registeringCollectors
+          = context.getRegisteringCollectors();
+      if (registeringCollectors != null) {
+        registeringCollectors.putAll(newCollectorsMap);
+      } else {
+        LOG.warn("collectors are added when the registered collectors are " +
+            "initialized");
+      }
     }
 
     return ReportNewCollectorInfoResponse.newInstance();

+ 22 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -558,21 +559,20 @@ public class ApplicationImpl implements Application {
   @SuppressWarnings("unchecked")
   static class AppCompletelyDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
-    @Override
-    public void transition(ApplicationImpl app, ApplicationEvent event) {
 
-      // Inform the logService
-      app.dispatcher.getEventHandler().handle(
-          new LogHandlerAppFinishedEvent(app.appId));
-
-      app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+    private void updateCollectorStatus(ApplicationImpl app) {
       // Remove collectors info for finished apps.
       // TODO check we remove related collectors info in failure cases
       // (YARN-3038)
-      Map<ApplicationId, String> registeredCollectors =
-          app.context.getRegisteredCollectors();
-      if (registeredCollectors != null) {
-        registeredCollectors.remove(app.getAppId());
+      Map<ApplicationId, AppCollectorData> registeringCollectors
+          = app.context.getRegisteringCollectors();
+      if (registeringCollectors != null) {
+        registeringCollectors.remove(app.getAppId());
+      }
+      Map<ApplicationId, AppCollectorData> knownCollectors =
+          app.context.getKnownCollectors();
+      if (knownCollectors != null) {
+        knownCollectors.remove(app.getAppId());
       }
       // stop timelineClient when application get finished.
       NMTimelinePublisher nmTimelinePublisher =
@@ -581,6 +581,17 @@ public class ApplicationImpl implements Application {
         nmTimelinePublisher.stopTimelineClient(app.getAppId());
       }
     }
+
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+
+      // Inform the logService
+      app.dispatcher.getEventHandler().handle(
+          new LogHandlerAppFinishedEvent(app.appId));
+
+      app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+      updateCollectorStatus(app);
+    }
   }
 
   static class AppLogsAggregatedTransition implements

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -658,8 +659,11 @@ public abstract class BaseAMRMProxyTest {
       return null;
     }
 
-    @Override
-    public Map<ApplicationId, String> getRegisteredCollectors() {
+    public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
+      return null;
+    }
+
+    @Override public Map<ApplicationId, AppCollectorData> getKnownCollectors() {
       return null;
     }
 

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@@ -254,7 +255,7 @@ public class ApplicationMasterService extends AbstractService implements
 
     // Remove collector address when app get finished.
     if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-      rmApp.removeCollectorAddr();
+      ((RMAppImpl) rmApp).removeCollectorData();
     }
     // checking whether the app exits in RMStateStore at first not to throw
     // ApplicationDoesNotExistInCacheException before and after

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
@@ -293,9 +294,10 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
     // add collector address for this application
     if (YarnConfiguration.timelineServiceV2Enabled(
         getRmContext().getYarnConfiguration())) {
-      response.setCollectorAddr(
-          getRmContext().getRMApps().get(appAttemptId.getApplicationId())
-              .getCollectorAddr());
+      AppCollectorData data = app.getCollectorData();
+      if (data != null) {
+        response.setCollectorAddr(data.getCollectorAddr());
+      }
     }
 
     // add preemption to the allocateResponse message (if any)

+ 37 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -63,12 +64,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -118,6 +121,8 @@ public class ResourceTrackerService extends AbstractService implements
   private boolean isDelegatedCentralizedNodeLabelsConf;
   private DynamicResourceConfiguration drConf;
 
+  private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
+
   public ResourceTrackerService(RMContext rmContext,
       NodesListManager nodesListManager,
       NMLivelinessMonitor nmLivelinessMonitor,
@@ -525,9 +530,6 @@ public class ResourceTrackerService extends AbstractService implements
         YarnConfiguration.timelineServiceV2Enabled(getConfig());
     if (timelineV2Enabled) {
       // Check & update collectors info from request.
-      // TODO make sure it won't have race condition issue for AM failed over
-      // case that the older registration could possible override the newer
-      // one.
       updateAppCollectorsMap(request);
     }
 
@@ -613,14 +615,14 @@ public class ResourceTrackerService extends AbstractService implements
 
   private void setAppCollectorsMapToResponse(
       List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
-    Map<ApplicationId, String> liveAppCollectorsMap = new
-        HashMap<ApplicationId, String>();
+    Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new
+        HashMap<>();
     Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
     // Set collectors for all running apps on this node.
     for (ApplicationId appId : runningApps) {
-      String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
-      if (appCollectorAddr != null) {
-        liveAppCollectorsMap.put(appId, appCollectorAddr);
+      AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData();
+      if (appCollectorData != null) {
+        liveAppCollectorsMap.put(appId, appCollectorData);
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Collector for applicaton: " + appId +
@@ -628,29 +630,43 @@ public class ResourceTrackerService extends AbstractService implements
         }
       }
     }
-    response.setAppCollectorsMap(liveAppCollectorsMap);
+    response.setAppCollectors(liveAppCollectorsMap);
   }
 
   private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
-    Map<ApplicationId, String> registeredCollectorsMap =
-        request.getRegisteredCollectors();
-    if (registeredCollectorsMap != null
-        && !registeredCollectorsMap.isEmpty()) {
+    Map<ApplicationId, AppCollectorData> registeringCollectorsMap =
+        request.getRegisteringCollectors();
+    if (registeringCollectorsMap != null
+        && !registeringCollectorsMap.isEmpty()) {
       Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
-      for (Map.Entry<ApplicationId, String> entry:
-          registeredCollectorsMap.entrySet()) {
+      for (Map.Entry<ApplicationId, AppCollectorData> entry:
+          registeringCollectorsMap.entrySet()) {
         ApplicationId appId = entry.getKey();
-        String collectorAddr = entry.getValue();
-        if (collectorAddr != null && !collectorAddr.isEmpty()) {
+        AppCollectorData collectorData = entry.getValue();
+        if (collectorData != null) {
+          if (!collectorData.isStamped()) {
+            // Stamp the collector if we have not done so
+            collectorData.setRMIdentifier(
+                ResourceManager.getClusterTimeStamp());
+            collectorData.setVersion(
+                timelineCollectorVersion.getAndIncrement());
+          }
           RMApp rmApp = rmApps.get(appId);
           if (rmApp == null) {
             LOG.warn("Cannot update collector info because application ID: " +
                 appId + " is not found in RMContext!");
           } else {
-            String previousCollectorAddr = rmApp.getCollectorAddr();
-            if (previousCollectorAddr == null
-                || !previousCollectorAddr.equals(collectorAddr)) {
-              rmApp.setCollectorAddr(collectorAddr);
+            AppCollectorData previousCollectorData = rmApp.getCollectorData();
+            if (AppCollectorData.happensBefore(previousCollectorData,
+                collectorData)) {
+              // Sending collector update event.
+              // Note: RM has to store the newly received collector data
+              // synchronously. Otherwise, the RM may send out stale collector
+              // data before this update is done, and the RM then crashes, the
+              // newly updated collector data will get lost.
+              LOG.info("Update collector information for application " + appId
+                  + " with new address: " + collectorData.getCollectorAddr());
+              ((RMAppImpl) rmApp).setCollectorData(collectorData);
             }
           }
         }

+ 11 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -180,27 +183,16 @@ public interface RMApp extends EventHandler<RMAppEvent> {
   String getTrackingUrl();
 
   /**
-   * The collector address for the application. It should be used only if the
-   * timeline service v.2 is enabled.
+   * The timeline collector information for the application. It should be used
+   * only if the timeline service v.2 is enabled.
    *
-   * @return the address for the application's collector, or null if the
-   * timeline service v.2 is not enabled.
+   * @return the data for the application's collector, including collector
+   * address, collector ID. Return null if the timeline service v.2 is not
+   * enabled.
    */
-  String getCollectorAddr();
-
-  /**
-   * Set collector address for the application. It should be used only if the
-   * timeline service v.2 is enabled.
-   *
-   * @param collectorAddr the address of collector
-   */
-  void setCollectorAddr(String collectorAddr);
-
-  /**
-   * Remove collector address when application is finished or killed. It should
-   * be used only if the timeline service v.2 is enabled.
-   */
-  void removeCollectorAddr();
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  AppCollectorData getCollectorData();
 
   /**
    * The original tracking url for the application master.

+ 0 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java

@@ -30,9 +30,6 @@ public enum RMAppEventType {
   // Source: Scheduler
   APP_ACCEPTED,
 
-  // TODO add source later
-  COLLECTOR_UPDATE,
-
   // Source: RMAppAttempt
   ATTEMPT_REGISTERED,
   ATTEMPT_UNREGISTERED,

+ 8 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -165,7 +166,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long storedFinishTime = 0;
   private int firstAttemptIdInStateStore = 1;
   private int nextAttemptId = 1;
-  private volatile String collectorAddr;
+  private AppCollectorData collectorData;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -611,18 +612,16 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
 
   @Override
-  public String getCollectorAddr() {
-    return this.collectorAddr;
+  public AppCollectorData getCollectorData() {
+    return this.collectorData;
   }
 
-  @Override
-  public void setCollectorAddr(String collectorAddress) {
-    this.collectorAddr = collectorAddress;
+  public void setCollectorData(AppCollectorData incomingData) {
+    this.collectorData = incomingData;
   }
 
-  @Override
-  public void removeCollectorAddr() {
-    this.collectorAddr = null;
+  public void removeCollectorData() {
+    this.collectorData = null;
   }
 
   @Override

+ 24 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -1011,13 +1013,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     RMNodeImpl node2 =
         (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());
 
-    RMApp app1 = rm.submitApp(1024);
+    RMAppImpl app1 = (RMAppImpl) rm.submitApp(1024);
     String collectorAddr1 = "1.2.3.4:5";
-    app1.setCollectorAddr(collectorAddr1);
+    app1.setCollectorData(AppCollectorData.newInstance(
+        app1.getApplicationId(), collectorAddr1));
 
     String collectorAddr2 = "5.4.3.2:1";
-    RMApp app2 = rm.submitApp(1024);
-    app2.setCollectorAddr(collectorAddr2);
+    RMAppImpl app2 = (RMAppImpl) rm.submitApp(1024);
+    app2.setCollectorData(AppCollectorData.newInstance(
+        app2.getApplicationId(), collectorAddr2));
+
+    String collectorAddr3 = "5.4.3.2:2";
+    app2.setCollectorData(AppCollectorData.newInstance(
+        app2.getApplicationId(), collectorAddr3, 0, 1));
+
+    String collectorAddr4 = "5.4.3.2:3";
+    app2.setCollectorData(AppCollectorData.newInstance(
+        app2.getApplicationId(), collectorAddr4, 1, 0));
 
     // Create a running container for app1 running on nm1
     ContainerId runningContainerId1 = BuilderUtils.newContainerId(
@@ -1055,14 +1067,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));
 
     nodeHeartbeat1 = nm1.nodeHeartbeat(true);
-    Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap();
+    Map<ApplicationId, AppCollectorData> map1
+        = nodeHeartbeat1.getAppCollectors();
     Assert.assertEquals(1, map1.size());
-    Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId()));
+    Assert.assertEquals(collectorAddr1,
+        map1.get(app1.getApplicationId()).getCollectorAddr());
 
     nodeHeartbeat2 = nm2.nodeHeartbeat(true);
-    Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap();
+    Map<ApplicationId, AppCollectorData> map2
+        = nodeHeartbeat2.getAppCollectors();
     Assert.assertEquals(1, map2.size());
-    Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId()));
+    Assert.assertEquals(collectorAddr4,
+        map2.get(app2.getApplicationId()).getCollectorAddr());
   }
 
   private void checkRebootedNMCount(MockRM rm2, int count)

+ 2 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -97,15 +98,7 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public String getCollectorAddr() {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-    @Override
-    public void setCollectorAddr(String collectorAddr) {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-    @Override
-    public void removeCollectorAddr() {
+    public AppCollectorData getCollectorData() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override

+ 2 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -305,17 +306,8 @@ public class MockRMApp implements RMApp {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
-  public String getCollectorAddr() {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public void removeCollectorAddr() {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
   @Override
-  public void setCollectorAddr(String collectorAddr) {
+  public AppCollectorData getCollectorData() {
     throw new UnsupportedOperationException("Not supported yet.");
   }