|
@@ -65,14 +65,23 @@ public class LocalContainerAllocator extends RMCommunicator
|
|
|
private AtomicInteger containerCount = new AtomicInteger();
|
|
|
private long retryInterval;
|
|
|
private long retrystartTime;
|
|
|
+ private String nmHost;
|
|
|
+ private int nmPort;
|
|
|
+ private int nmHttpPort;
|
|
|
+ private ContainerId containerId;
|
|
|
|
|
|
private final RecordFactory recordFactory =
|
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
|
|
|
|
public LocalContainerAllocator(ClientService clientService,
|
|
|
- AppContext context) {
|
|
|
+ AppContext context, String nmHost, int nmPort, int nmHttpPort
|
|
|
+ , ContainerId cId) {
|
|
|
super(clientService, context);
|
|
|
this.eventHandler = context.getEventHandler();
|
|
|
+ this.nmHost = nmHost;
|
|
|
+ this.nmPort = nmPort;
|
|
|
+ this.nmHttpPort = nmHttpPort;
|
|
|
+ this.containerId = cId;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -131,17 +140,17 @@ public class LocalContainerAllocator extends RMCommunicator
|
|
|
LOG.info("Processing the event " + event.toString());
|
|
|
ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
|
|
|
cID.setApplicationAttemptId(applicationAttemptId);
|
|
|
- // use negative ids to denote that these are local. Need a better way ??
|
|
|
- cID.setId((-1) * containerCount.getAndIncrement());
|
|
|
+ // Assign the same container ID as the AM
|
|
|
+ cID.setId(this.containerId.getId());
|
|
|
|
|
|
Container container = recordFactory.newRecordInstance(Container.class);
|
|
|
container.setId(cID);
|
|
|
NodeId nodeId = Records.newRecord(NodeId.class);
|
|
|
- nodeId.setHost("localhost");
|
|
|
- nodeId.setPort(1234);
|
|
|
+ nodeId.setHost(this.nmHost);
|
|
|
+ nodeId.setPort(this.nmPort);
|
|
|
container.setNodeId(nodeId);
|
|
|
container.setContainerToken(null);
|
|
|
- container.setNodeHttpAddress("localhost:8042");
|
|
|
+ container.setNodeHttpAddress(this.nmHost + ":" + this.nmHttpPort);
|
|
|
// send the container-assigned event to task attempt
|
|
|
|
|
|
if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
|