|
@@ -31,13 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
|
|
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
|
|
|
|
|
@@ -56,7 +59,8 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
|
|
|
|
|
private Resource maximumResourceCapability;
|
|
|
private Map<ApplicationAccessType, String> applicationACLS = null;
|
|
|
- private List<Container> containersFromPreviousAttempt = null;
|
|
|
+ private List<Container> containersFromPreviousAttempts = null;
|
|
|
+ private List<NMToken> nmTokens = null;
|
|
|
|
|
|
public RegisterApplicationMasterResponsePBImpl() {
|
|
|
builder = RegisterApplicationMasterResponseProto.newBuilder();
|
|
@@ -110,8 +114,13 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
|
|
if (this.applicationACLS != null) {
|
|
|
addApplicationACLs();
|
|
|
}
|
|
|
- if (this.containersFromPreviousAttempt != null) {
|
|
|
- addRunningContainersToProto();
|
|
|
+ if (this.containersFromPreviousAttempts != null) {
|
|
|
+ addContainersFromPreviousAttemptToProto();
|
|
|
+ }
|
|
|
+ if (nmTokens != null) {
|
|
|
+ builder.clearNmTokensFromPreviousAttempts();
|
|
|
+ Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
|
|
|
+ builder.addAllNmTokensFromPreviousAttempts(iterable);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -236,21 +245,22 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public List<Container> getContainersFromPreviousAttempt() {
|
|
|
- if (this.containersFromPreviousAttempt != null) {
|
|
|
- return this.containersFromPreviousAttempt;
|
|
|
+ public List<Container> getContainersFromPreviousAttempts() {
|
|
|
+ if (this.containersFromPreviousAttempts != null) {
|
|
|
+ return this.containersFromPreviousAttempts;
|
|
|
}
|
|
|
- initRunningContainersList();
|
|
|
- return this.containersFromPreviousAttempt;
|
|
|
+ initContainersPreviousAttemptList();
|
|
|
+ return this.containersFromPreviousAttempts;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void setContainersFromPreviousAttempt(final List<Container> containers) {
|
|
|
+ public void
|
|
|
+ setContainersFromPreviousAttempts(final List<Container> containers) {
|
|
|
if (containers == null) {
|
|
|
return;
|
|
|
}
|
|
|
- this.containersFromPreviousAttempt = new ArrayList<Container>();
|
|
|
- this.containersFromPreviousAttempt.addAll(containers);
|
|
|
+ this.containersFromPreviousAttempts = new ArrayList<Container>();
|
|
|
+ this.containersFromPreviousAttempts.addAll(containers);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -272,25 +282,88 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void initRunningContainersList() {
|
|
|
- RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
|
|
- List<ContainerProto> list = p.getContainersFromPreviousAttemptList();
|
|
|
- containersFromPreviousAttempt = new ArrayList<Container>();
|
|
|
+
|
|
|
+ private void initContainersPreviousAttemptList() {
|
|
|
+ RegisterApplicationMasterResponseProtoOrBuilder p =
|
|
|
+ viaProto ? proto : builder;
|
|
|
+ List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
|
|
|
+ containersFromPreviousAttempts = new ArrayList<Container>();
|
|
|
for (ContainerProto c : list) {
|
|
|
- containersFromPreviousAttempt.add(convertFromProtoFormat(c));
|
|
|
+ containersFromPreviousAttempts.add(convertFromProtoFormat(c));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void addRunningContainersToProto() {
|
|
|
+ private void addContainersFromPreviousAttemptToProto() {
|
|
|
maybeInitBuilder();
|
|
|
- builder.clearContainersFromPreviousAttempt();
|
|
|
+ builder.clearContainersFromPreviousAttempts();
|
|
|
List<ContainerProto> list = new ArrayList<ContainerProto>();
|
|
|
- for (Container c : containersFromPreviousAttempt) {
|
|
|
+ for (Container c : containersFromPreviousAttempts) {
|
|
|
list.add(convertToProtoFormat(c));
|
|
|
}
|
|
|
- builder.addAllContainersFromPreviousAttempt(list);
|
|
|
+ builder.addAllContainersFromPreviousAttempts(list);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<NMToken> getNMTokensFromPreviousAttempts() {
|
|
|
+ if (nmTokens != null) {
|
|
|
+ return nmTokens;
|
|
|
+ }
|
|
|
+ initLocalNewNMTokenList();
|
|
|
+ return nmTokens;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void setNMTokensFromPreviousAttempts(final List<NMToken> nmTokens) {
|
|
|
+ if (nmTokens == null || nmTokens.isEmpty()) {
|
|
|
+ if (this.nmTokens != null) {
|
|
|
+ this.nmTokens.clear();
|
|
|
+ }
|
|
|
+ builder.clearNmTokensFromPreviousAttempts();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ this.nmTokens = new ArrayList<NMToken>();
|
|
|
+ this.nmTokens.addAll(nmTokens);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void initLocalNewNMTokenList() {
|
|
|
+ RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
|
|
+ List<NMTokenProto> list = p.getNmTokensFromPreviousAttemptsList();
|
|
|
+ nmTokens = new ArrayList<NMToken>();
|
|
|
+ for (NMTokenProto t : list) {
|
|
|
+ nmTokens.add(convertFromProtoFormat(t));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
|
|
|
+ final List<NMToken> nmTokenList) {
|
|
|
+ maybeInitBuilder();
|
|
|
+ return new Iterable<NMTokenProto>() {
|
|
|
+ @Override
|
|
|
+ public synchronized Iterator<NMTokenProto> iterator() {
|
|
|
+ return new Iterator<NMTokenProto>() {
|
|
|
+
|
|
|
+ Iterator<NMToken> iter = nmTokenList.iterator();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean hasNext() {
|
|
|
+ return iter.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public NMTokenProto next() {
|
|
|
+ return convertToProtoFormat(iter.next());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void remove() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
private Resource convertFromProtoFormat(ResourceProto resource) {
|
|
|
return new ResourcePBImpl(resource);
|
|
|
}
|
|
@@ -306,4 +379,12 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
|
|
private ContainerProto convertToProtoFormat(Container t) {
|
|
|
return ((ContainerPBImpl) t).getProto();
|
|
|
}
|
|
|
+
|
|
|
+ private NMTokenProto convertToProtoFormat(NMToken token) {
|
|
|
+ return ((NMTokenPBImpl) token).getProto();
|
|
|
+ }
|
|
|
+
|
|
|
+ private NMToken convertFromProtoFormat(NMTokenProto proto) {
|
|
|
+ return new NMTokenPBImpl(proto);
|
|
|
+ }
|
|
|
}
|