|
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.sls.appmaster;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
-import java.text.MessageFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
@@ -45,7 +44,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
|
|
|
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
|
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
|
|
-import org.apache.log4j.Logger;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@Private
|
|
|
@Unstable
|
|
@@ -116,7 +116,7 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
|
|
|
private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
|
|
|
|
|
|
- public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
|
|
|
+ public final Logger LOG = LoggerFactory.getLogger(MRAMSimulator.class);
|
|
|
|
|
|
public void init(int id, int heartbeatInterval,
|
|
|
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
|
|
@@ -162,8 +162,7 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
MR_AM_CONTAINER_RESOURCE_VCORES),
|
|
|
ResourceRequest.ANY, 1, 1);
|
|
|
ask.add(amRequest);
|
|
|
- LOG.debug(MessageFormat.format("Application {0} sends out allocate " +
|
|
|
- "request for its AM", appId));
|
|
|
+ LOG.debug("Application {} sends out allocate request for its AM", appId);
|
|
|
final AllocateRequest request = this.createAllocateRequest(ask);
|
|
|
|
|
|
UserGroupInformation ugi =
|
|
@@ -200,8 +199,8 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
.addNewContainer(container, -1L);
|
|
|
// Start AM container
|
|
|
amContainer = container;
|
|
|
- LOG.debug(MessageFormat.format("Application {0} starts its " +
|
|
|
- "AM container ({1}).", appId, amContainer.getId()));
|
|
|
+ LOG.debug("Application {} starts its AM container ({}).", appId,
|
|
|
+ amContainer.getId());
|
|
|
isAMContainerRunning = true;
|
|
|
}
|
|
|
}
|
|
@@ -217,36 +216,35 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
ContainerId containerId = cs.getContainerId();
|
|
|
if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
|
|
|
if (assignedMaps.containsKey(containerId)) {
|
|
|
- LOG.debug(MessageFormat.format("Application {0} has one" +
|
|
|
- "mapper finished ({1}).", appId, containerId));
|
|
|
+ LOG.debug("Application {} has one mapper finished ({}).",
|
|
|
+ appId, containerId);
|
|
|
assignedMaps.remove(containerId);
|
|
|
mapFinished ++;
|
|
|
finishedContainers ++;
|
|
|
} else if (assignedReduces.containsKey(containerId)) {
|
|
|
- LOG.debug(MessageFormat.format("Application {0} has one" +
|
|
|
- "reducer finished ({1}).", appId, containerId));
|
|
|
+ LOG.debug("Application {} has one reducer finished ({}).",
|
|
|
+ appId, containerId);
|
|
|
assignedReduces.remove(containerId);
|
|
|
reduceFinished ++;
|
|
|
finishedContainers ++;
|
|
|
} else {
|
|
|
// am container released event
|
|
|
isFinished = true;
|
|
|
- LOG.info(MessageFormat.format("Application {0} goes to " +
|
|
|
- "finish.", appId));
|
|
|
+ LOG.info("Application {} goes to finish.", appId);
|
|
|
}
|
|
|
} else {
|
|
|
// container to be killed
|
|
|
if (assignedMaps.containsKey(containerId)) {
|
|
|
- LOG.debug(MessageFormat.format("Application {0} has one " +
|
|
|
- "mapper killed ({1}).", appId, containerId));
|
|
|
+ LOG.debug("Application {} has one mapper killed ({}).",
|
|
|
+ appId, containerId);
|
|
|
pendingFailedMaps.add(assignedMaps.remove(containerId));
|
|
|
} else if (assignedReduces.containsKey(containerId)) {
|
|
|
- LOG.debug(MessageFormat.format("Application {0} has one " +
|
|
|
- "reducer killed ({1}).", appId, containerId));
|
|
|
+ LOG.debug("Application {} has one reducer killed ({}).",
|
|
|
+ appId, containerId);
|
|
|
pendingFailedReduces.add(assignedReduces.remove(containerId));
|
|
|
} else {
|
|
|
- LOG.info(MessageFormat.format("Application {0}'s AM is " +
|
|
|
- "going to be killed. Restarting...", appId));
|
|
|
+ LOG.info("Application {}'s AM is going to be killed." +
|
|
|
+ " Restarting...", appId);
|
|
|
restart();
|
|
|
}
|
|
|
}
|
|
@@ -261,8 +259,8 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
se.getNmMap().get(amContainer.getNodeId())
|
|
|
.cleanupContainer(amContainer.getId());
|
|
|
isAMContainerRunning = false;
|
|
|
- LOG.debug(MessageFormat.format("Application {0} sends out event " +
|
|
|
- "to clean up its AM container.", appId));
|
|
|
+ LOG.debug("Application {} sends out event to clean up"
|
|
|
+ + " its AM container.", appId);
|
|
|
isFinished = true;
|
|
|
break;
|
|
|
}
|
|
@@ -271,15 +269,15 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
for (Container container : response.getAllocatedContainers()) {
|
|
|
if (! scheduledMaps.isEmpty()) {
|
|
|
ContainerSimulator cs = scheduledMaps.remove();
|
|
|
- LOG.debug(MessageFormat.format("Application {0} starts a " +
|
|
|
- "launch a mapper ({1}).", appId, container.getId()));
|
|
|
+ LOG.debug("Application {} starts to launch a mapper ({}).",
|
|
|
+ appId, container.getId());
|
|
|
assignedMaps.put(container.getId(), cs);
|
|
|
se.getNmMap().get(container.getNodeId())
|
|
|
.addNewContainer(container, cs.getLifeTime());
|
|
|
} else if (! this.scheduledReduces.isEmpty()) {
|
|
|
ContainerSimulator cs = scheduledReduces.remove();
|
|
|
- LOG.debug(MessageFormat.format("Application {0} starts a " +
|
|
|
- "launch a reducer ({1}).", appId, container.getId()));
|
|
|
+ LOG.debug("Application {} starts to launch a reducer ({}).",
|
|
|
+ appId, container.getId());
|
|
|
assignedReduces.put(container.getId(), cs);
|
|
|
se.getNmMap().get(container.getNodeId())
|
|
|
.addNewContainer(container, cs.getLifeTime());
|
|
@@ -324,15 +322,14 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
// map phase
|
|
|
if (! pendingMaps.isEmpty()) {
|
|
|
ask = packageRequests(pendingMaps, PRIORITY_MAP);
|
|
|
- LOG.debug(MessageFormat.format("Application {0} sends out " +
|
|
|
- "request for {1} mappers.", appId, pendingMaps.size()));
|
|
|
+ LOG.debug("Application {} sends out request for {} mappers.",
|
|
|
+ appId, pendingMaps.size());
|
|
|
scheduledMaps.addAll(pendingMaps);
|
|
|
pendingMaps.clear();
|
|
|
} else if (! pendingFailedMaps.isEmpty() && scheduledMaps.isEmpty()) {
|
|
|
ask = packageRequests(pendingFailedMaps, PRIORITY_MAP);
|
|
|
- LOG.debug(MessageFormat.format("Application {0} sends out " +
|
|
|
- "requests for {1} failed mappers.", appId,
|
|
|
- pendingFailedMaps.size()));
|
|
|
+ LOG.debug("Application {} sends out requests for {} failed mappers.",
|
|
|
+ appId, pendingFailedMaps.size());
|
|
|
scheduledMaps.addAll(pendingFailedMaps);
|
|
|
pendingFailedMaps.clear();
|
|
|
}
|
|
@@ -340,16 +337,15 @@ public class MRAMSimulator extends AMSimulator {
|
|
|
// reduce phase
|
|
|
if (! pendingReduces.isEmpty()) {
|
|
|
ask = packageRequests(pendingReduces, PRIORITY_REDUCE);
|
|
|
- LOG.debug(MessageFormat.format("Application {0} sends out " +
|
|
|
- "requests for {1} reducers.", appId, pendingReduces.size()));
|
|
|
+ LOG.debug("Application {} sends out requests for {} reducers.",
|
|
|
+ appId, pendingReduces.size());
|
|
|
scheduledReduces.addAll(pendingReduces);
|
|
|
pendingReduces.clear();
|
|
|
} else if (! pendingFailedReduces.isEmpty()
|
|
|
&& scheduledReduces.isEmpty()) {
|
|
|
ask = packageRequests(pendingFailedReduces, PRIORITY_REDUCE);
|
|
|
- LOG.debug(MessageFormat.format("Application {0} sends out " +
|
|
|
- "request for {1} failed reducers.", appId,
|
|
|
- pendingFailedReduces.size()));
|
|
|
+ LOG.debug("Application {} sends out request for {} failed reducers.",
|
|
|
+ appId, pendingFailedReduces.size());
|
|
|
scheduledReduces.addAll(pendingFailedReduces);
|
|
|
pendingFailedReduces.clear();
|
|
|
}
|