|
@@ -28,12 +28,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.AMCommand;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
|
|
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl;
|
|
|
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
|
|
@@ -41,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
|
|
@@ -63,6 +69,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
private List<Container> allocatedContainers = null;
|
|
|
private List<NMToken> nmTokens = null;
|
|
|
private List<ContainerStatus> completedContainersStatuses = null;
|
|
|
+ private List<ContainerResourceIncrease> increasedContainers = null;
|
|
|
+ private List<ContainerResourceDecrease> decreasedContainers = null;
|
|
|
|
|
|
private List<NodeReport> updatedNodes = null;
|
|
|
private PreemptionMessage preempt;
|
|
@@ -108,7 +116,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
if (this.allocatedContainers != null) {
|
|
|
builder.clearAllocatedContainers();
|
|
|
Iterable<ContainerProto> iterable =
|
|
|
- getProtoIterable(this.allocatedContainers);
|
|
|
+ getContainerProtoIterable(this.allocatedContainers);
|
|
|
builder.addAllAllocatedContainers(iterable);
|
|
|
}
|
|
|
if (nmTokens != null) {
|
|
@@ -134,6 +142,18 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
if (this.preempt != null) {
|
|
|
builder.setPreempt(convertToProtoFormat(this.preempt));
|
|
|
}
|
|
|
+ if (this.increasedContainers != null) {
|
|
|
+ builder.clearIncreasedContainers();
|
|
|
+ Iterable<ContainerResourceIncreaseProto> iterable =
|
|
|
+ getIncreaseProtoIterable(this.increasedContainers);
|
|
|
+ builder.addAllIncreasedContainers(iterable);
|
|
|
+ }
|
|
|
+ if (this.decreasedContainers != null) {
|
|
|
+ builder.clearDecreasedContainers();
|
|
|
+ Iterable<ContainerResourceDecreaseProto> iterable =
|
|
|
+ getChangeProtoIterable(this.decreasedContainers);
|
|
|
+ builder.addAllDecreasedContainers(iterable);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private synchronized void mergeLocalToProto() {
|
|
@@ -306,6 +326,63 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
this.preempt = preempt;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public synchronized List<ContainerResourceIncrease> getIncreasedContainers() {
|
|
|
+ initLocalIncreasedContainerList();
|
|
|
+ return increasedContainers;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void setIncreasedContainers(
|
|
|
+ List<ContainerResourceIncrease> increasedContainers) {
|
|
|
+ if (increasedContainers == null)
|
|
|
+ return;
|
|
|
+ initLocalIncreasedContainerList();
|
|
|
+ this.increasedContainers.addAll(increasedContainers);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized List<ContainerResourceDecrease> getDecreasedContainers() {
|
|
|
+ initLocalDecreasedContainerList();
|
|
|
+ return decreasedContainers;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void setDecreasedContainers(
|
|
|
+ List<ContainerResourceDecrease> decreasedContainers) {
|
|
|
+ if (decreasedContainers == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ initLocalDecreasedContainerList();
|
|
|
+ this.decreasedContainers.addAll(decreasedContainers);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void initLocalIncreasedContainerList() {
|
|
|
+ if (this.increasedContainers != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
|
|
+ List<ContainerResourceIncreaseProto> list = p.getIncreasedContainersList();
|
|
|
+ increasedContainers = new ArrayList<ContainerResourceIncrease>();
|
|
|
+
|
|
|
+ for (ContainerResourceIncreaseProto c : list) {
|
|
|
+ increasedContainers.add(convertFromProtoFormat(c));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void initLocalDecreasedContainerList() {
|
|
|
+ if (this.decreasedContainers != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
|
|
+ List<ContainerResourceDecreaseProto> list = p.getDecreasedContainersList();
|
|
|
+ decreasedContainers = new ArrayList<ContainerResourceDecrease>();
|
|
|
+
|
|
|
+ for (ContainerResourceDecreaseProto c : list) {
|
|
|
+ decreasedContainers.add(convertFromProtoFormat(c));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Once this is called. updatedNodes will never be null - until a getProto is
|
|
|
// called.
|
|
|
private synchronized void initLocalNewNodeReportList() {
|
|
@@ -348,7 +425,71 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private synchronized Iterable<ContainerProto> getProtoIterable(
|
|
|
+ private synchronized Iterable<ContainerResourceIncreaseProto>
|
|
|
+ getIncreaseProtoIterable(
|
|
|
+ final List<ContainerResourceIncrease> newContainersList) {
|
|
|
+ maybeInitBuilder();
|
|
|
+ return new Iterable<ContainerResourceIncreaseProto>() {
|
|
|
+ @Override
|
|
|
+ public synchronized Iterator<ContainerResourceIncreaseProto> iterator() {
|
|
|
+ return new Iterator<ContainerResourceIncreaseProto>() {
|
|
|
+
|
|
|
+ Iterator<ContainerResourceIncrease> iter = newContainersList
|
|
|
+ .iterator();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized boolean hasNext() {
|
|
|
+ return iter.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized ContainerResourceIncreaseProto next() {
|
|
|
+ return convertToProtoFormat(iter.next());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void remove() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized Iterable<ContainerResourceDecreaseProto>
|
|
|
+ getChangeProtoIterable(
|
|
|
+ final List<ContainerResourceDecrease> newContainersList) {
|
|
|
+ maybeInitBuilder();
|
|
|
+ return new Iterable<ContainerResourceDecreaseProto>() {
|
|
|
+ @Override
|
|
|
+ public synchronized Iterator<ContainerResourceDecreaseProto> iterator() {
|
|
|
+ return new Iterator<ContainerResourceDecreaseProto>() {
|
|
|
+
|
|
|
+ Iterator<ContainerResourceDecrease> iter = newContainersList
|
|
|
+ .iterator();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized boolean hasNext() {
|
|
|
+ return iter.hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized ContainerResourceDecreaseProto next() {
|
|
|
+ return convertToProtoFormat(iter.next());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void remove() {
|
|
|
+ throw new UnsupportedOperationException();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized Iterable<ContainerProto> getContainerProtoIterable(
|
|
|
final List<Container> newContainersList) {
|
|
|
maybeInitBuilder();
|
|
|
return new Iterable<ContainerProto>() {
|
|
@@ -467,7 +608,6 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
|
|
|
}
|
|
|
};
|
|
|
-
|
|
|
}
|
|
|
};
|
|
|
}
|
|
@@ -486,6 +626,26 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
completedContainersStatuses.add(convertFromProtoFormat(c));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private synchronized ContainerResourceIncrease convertFromProtoFormat(
|
|
|
+ ContainerResourceIncreaseProto p) {
|
|
|
+ return new ContainerResourceIncreasePBImpl(p);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized ContainerResourceIncreaseProto convertToProtoFormat(
|
|
|
+ ContainerResourceIncrease t) {
|
|
|
+ return ((ContainerResourceIncreasePBImpl) t).getProto();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized ContainerResourceDecrease convertFromProtoFormat(
|
|
|
+ ContainerResourceDecreaseProto p) {
|
|
|
+ return new ContainerResourceDecreasePBImpl(p);
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized ContainerResourceDecreaseProto convertToProtoFormat(
|
|
|
+ ContainerResourceDecrease t) {
|
|
|
+ return ((ContainerResourceDecreasePBImpl) t).getProto();
|
|
|
+ }
|
|
|
|
|
|
private synchronized NodeReportPBImpl convertFromProtoFormat(
|
|
|
NodeReportProto p) {
|
|
@@ -500,8 +660,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
|
|
ContainerProto p) {
|
|
|
return new ContainerPBImpl(p);
|
|
|
}
|
|
|
-
|
|
|
- private synchronized ContainerProto convertToProtoFormat(Container t) {
|
|
|
+
|
|
|
+ private synchronized ContainerProto convertToProtoFormat(
|
|
|
+ Container t) {
|
|
|
return ((ContainerPBImpl)t).getProto();
|
|
|
}
|
|
|
|