|
@@ -21,11 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -38,461 +35,176 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileContext;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
|
|
public class DeletionService extends AbstractService {
|
|
|
- static final Log LOG = LogFactory.getLog(DeletionService.class);
|
|
|
+
|
|
|
+ private static final Log LOG = LogFactory.getLog(DeletionService.class);
|
|
|
+
|
|
|
private int debugDelay;
|
|
|
- private final ContainerExecutor exec;
|
|
|
- private ScheduledThreadPoolExecutor sched;
|
|
|
- private static final FileContext lfs = getLfs();
|
|
|
+ private final ContainerExecutor containerExecutor;
|
|
|
private final NMStateStoreService stateStore;
|
|
|
+ private ScheduledThreadPoolExecutor sched;
|
|
|
private AtomicInteger nextTaskId = new AtomicInteger(0);
|
|
|
|
|
|
- static final FileContext getLfs() {
|
|
|
- try {
|
|
|
- return FileContext.getLocalFSFileContext();
|
|
|
- } catch (UnsupportedFileSystemException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public DeletionService(ContainerExecutor exec) {
|
|
|
this(exec, new NMNullStateStoreService());
|
|
|
}
|
|
|
|
|
|
- public DeletionService(ContainerExecutor exec,
|
|
|
+ public DeletionService(ContainerExecutor containerExecutor,
|
|
|
NMStateStoreService stateStore) {
|
|
|
super(DeletionService.class.getName());
|
|
|
- this.exec = exec;
|
|
|
+ this.containerExecutor = containerExecutor;
|
|
|
this.debugDelay = 0;
|
|
|
this.stateStore = stateStore;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Delete the path(s) as this user.
|
|
|
- * @param user The user to delete as, or the JVM user if null
|
|
|
- * @param subDir the sub directory name
|
|
|
- * @param baseDirs the base directories which contains the subDir's
|
|
|
- */
|
|
|
- public void delete(String user, Path subDir, Path... baseDirs) {
|
|
|
- // TODO if parent owned by NM, rename within parent inline
|
|
|
- if (debugDelay != -1) {
|
|
|
- List<Path> baseDirList = null;
|
|
|
- if (baseDirs != null && baseDirs.length != 0) {
|
|
|
- baseDirList = Arrays.asList(baseDirs);
|
|
|
- }
|
|
|
- FileDeletionTask task =
|
|
|
- new FileDeletionTask(this, user, subDir, baseDirList);
|
|
|
- recordDeletionTaskInStateStore(task);
|
|
|
- sched.schedule(task, debugDelay, TimeUnit.SECONDS);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
|
|
|
- if (debugDelay != -1) {
|
|
|
- recordDeletionTaskInStateStore(fileDeletionTask);
|
|
|
- sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void serviceInit(Configuration conf) throws Exception {
|
|
|
- ThreadFactory tf = new ThreadFactoryBuilder()
|
|
|
- .setNameFormat("DeletionService #%d")
|
|
|
- .build();
|
|
|
- if (conf != null) {
|
|
|
- sched = new HadoopScheduledThreadPoolExecutor(
|
|
|
- conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
|
|
|
- YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
|
|
|
- debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
|
|
|
- } else {
|
|
|
- sched = new HadoopScheduledThreadPoolExecutor(
|
|
|
- YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
|
|
|
- }
|
|
|
- sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
|
- sched.setKeepAliveTime(60L, SECONDS);
|
|
|
- if (stateStore.canRecover()) {
|
|
|
- recover(stateStore.loadDeletionServiceState());
|
|
|
- }
|
|
|
- super.serviceInit(conf);
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- protected void serviceStop() throws Exception {
|
|
|
- if (sched != null) {
|
|
|
- sched.shutdown();
|
|
|
- boolean terminated = false;
|
|
|
- try {
|
|
|
- terminated = sched.awaitTermination(10, SECONDS);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- }
|
|
|
- if (terminated != true) {
|
|
|
- sched.shutdownNow();
|
|
|
- }
|
|
|
- }
|
|
|
- super.serviceStop();
|
|
|
+ public int getDebugDelay() {
|
|
|
+ return debugDelay;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Determine if the service has completely stopped.
|
|
|
- * Used only by unit tests
|
|
|
- * @return true if service has completely stopped
|
|
|
- */
|
|
|
- @Private
|
|
|
- public boolean isTerminated() {
|
|
|
- return getServiceState() == STATE.STOPPED && sched.isTerminated();
|
|
|
+ public ContainerExecutor getContainerExecutor() {
|
|
|
+ return containerExecutor;
|
|
|
}
|
|
|
|
|
|
- public static class FileDeletionTask implements Runnable {
|
|
|
- public static final int INVALID_TASK_ID = -1;
|
|
|
- private int taskId;
|
|
|
- private final String user;
|
|
|
- private final Path subDir;
|
|
|
- private final List<Path> baseDirs;
|
|
|
- private final AtomicInteger numberOfPendingPredecessorTasks;
|
|
|
- private final Set<FileDeletionTask> successorTaskSet;
|
|
|
- private final DeletionService delService;
|
|
|
- // By default all tasks will start as success=true; however if any of
|
|
|
- // the dependent task fails then it will be marked as false in
|
|
|
- // fileDeletionTaskFinished().
|
|
|
- private boolean success;
|
|
|
-
|
|
|
- private FileDeletionTask(DeletionService delService, String user,
|
|
|
- Path subDir, List<Path> baseDirs) {
|
|
|
- this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
|
|
|
- }
|
|
|
-
|
|
|
- private FileDeletionTask(int taskId, DeletionService delService,
|
|
|
- String user, Path subDir, List<Path> baseDirs) {
|
|
|
- this.taskId = taskId;
|
|
|
- this.delService = delService;
|
|
|
- this.user = user;
|
|
|
- this.subDir = subDir;
|
|
|
- this.baseDirs = baseDirs;
|
|
|
- this.successorTaskSet = new HashSet<FileDeletionTask>();
|
|
|
- this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
|
|
|
- success = true;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * increments and returns pending predecessor task count
|
|
|
- */
|
|
|
- public int incrementAndGetPendingPredecessorTasks() {
|
|
|
- return numberOfPendingPredecessorTasks.incrementAndGet();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * decrements and returns pending predecessor task count
|
|
|
- */
|
|
|
- public int decrementAndGetPendingPredecessorTasks() {
|
|
|
- return numberOfPendingPredecessorTasks.decrementAndGet();
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- public String getUser() {
|
|
|
- return this.user;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- public Path getSubDir() {
|
|
|
- return this.subDir;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- public List<Path> getBaseDirs() {
|
|
|
- return this.baseDirs;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void setSuccess(boolean success) {
|
|
|
- this.success = success;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized boolean getSucess() {
|
|
|
- return this.success;
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized FileDeletionTask[] getSuccessorTasks() {
|
|
|
- FileDeletionTask[] successors =
|
|
|
- new FileDeletionTask[successorTaskSet.size()];
|
|
|
- return successorTaskSet.toArray(successors);
|
|
|
- }
|
|
|
+ public NMStateStoreService getStateStore() {
|
|
|
+ return stateStore;
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
+ public void delete(DeletionTask deletionTask) {
|
|
|
+ if (debugDelay != -1) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(this);
|
|
|
- }
|
|
|
- boolean error = false;
|
|
|
- if (null == user) {
|
|
|
- if (baseDirs == null || baseDirs.size() == 0) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("NM deleting absolute path : " + subDir);
|
|
|
- }
|
|
|
- try {
|
|
|
- lfs.delete(subDir, true);
|
|
|
- } catch (IOException e) {
|
|
|
- error = true;
|
|
|
- LOG.warn("Failed to delete " + subDir);
|
|
|
- }
|
|
|
- } else {
|
|
|
- for (Path baseDir : baseDirs) {
|
|
|
- Path del = subDir == null? baseDir : new Path(baseDir, subDir);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("NM deleting path : " + del);
|
|
|
- }
|
|
|
- try {
|
|
|
- lfs.delete(del, true);
|
|
|
- } catch (IOException e) {
|
|
|
- error = true;
|
|
|
- LOG.warn("Failed to delete " + subDir);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- try {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(
|
|
|
- "Deleting path: [" + subDir + "] as user: [" + user + "]");
|
|
|
- }
|
|
|
- if (baseDirs == null || baseDirs.size() == 0) {
|
|
|
- delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
|
|
|
- .setUser(user)
|
|
|
- .setSubDir(subDir)
|
|
|
- .build());
|
|
|
- } else {
|
|
|
- delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
|
|
|
- .setUser(user)
|
|
|
- .setSubDir(subDir)
|
|
|
- .setBasedirs(baseDirs.toArray(new Path[0]))
|
|
|
- .build());
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- error = true;
|
|
|
- LOG.warn("Failed to delete as user " + user, e);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- error = true;
|
|
|
- LOG.warn("Failed to delete as user " + user, e);
|
|
|
- }
|
|
|
- }
|
|
|
- if (error) {
|
|
|
- setSuccess(!error);
|
|
|
- }
|
|
|
- fileDeletionTaskFinished();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
|
|
|
- sb.append(" user : ").append(this.user);
|
|
|
- sb.append(" subDir : ").append(
|
|
|
- subDir == null ? "null" : subDir.toString());
|
|
|
- sb.append(" baseDir : ");
|
|
|
- if (baseDirs == null || baseDirs.size() == 0) {
|
|
|
- sb.append("null");
|
|
|
- } else {
|
|
|
- for (Path baseDir : baseDirs) {
|
|
|
- sb.append(baseDir.toString()).append(',');
|
|
|
- }
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * If there is a task dependency between say tasks 1,2,3 such that
|
|
|
- * task2 and task3 can be started only after task1 then we should define
|
|
|
- * task2 and task3 as successor tasks for task1.
|
|
|
- * Note:- Task dependency should be defined prior to
|
|
|
- * @param successorTask
|
|
|
- */
|
|
|
- public synchronized void addFileDeletionTaskDependency(
|
|
|
- FileDeletionTask successorTask) {
|
|
|
- if (successorTaskSet.add(successorTask)) {
|
|
|
- successorTask.incrementAndGetPendingPredecessorTasks();
|
|
|
+ String msg = String.format("Scheduling DeletionTask (delay %d) : %s",
|
|
|
+ debugDelay, deletionTask.toString());
|
|
|
+ LOG.debug(msg);
|
|
|
}
|
|
|
+ recordDeletionTaskInStateStore(deletionTask);
|
|
|
+ sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS);
|
|
|
}
|
|
|
-
|
|
|
- /*
|
|
|
- * This is called when
|
|
|
- * 1) Current file deletion task ran and finished.
|
|
|
- * 2) This can be even directly called by predecessor task if one of the
|
|
|
- * dependent tasks of it has failed marking its success = false.
|
|
|
- */
|
|
|
- private synchronized void fileDeletionTaskFinished() {
|
|
|
- try {
|
|
|
- delService.stateStore.removeDeletionTask(taskId);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Unable to remove deletion task " + taskId
|
|
|
- + " from state store", e);
|
|
|
- }
|
|
|
- Iterator<FileDeletionTask> successorTaskI =
|
|
|
- this.successorTaskSet.iterator();
|
|
|
- while (successorTaskI.hasNext()) {
|
|
|
- FileDeletionTask successorTask = successorTaskI.next();
|
|
|
- if (!success) {
|
|
|
- successorTask.setSuccess(success);
|
|
|
- }
|
|
|
- int count = successorTask.decrementAndGetPendingPredecessorTasks();
|
|
|
- if (count == 0) {
|
|
|
- if (successorTask.getSucess()) {
|
|
|
- successorTask.delService.scheduleFileDeletionTask(successorTask);
|
|
|
- } else {
|
|
|
- successorTask.fileDeletionTaskFinished();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Helper method to create file deletion task. To be used only if we need
|
|
|
- * a way to define dependencies between deletion tasks.
|
|
|
- * @param user user on whose behalf this task is suppose to run
|
|
|
- * @param subDir sub directory as required in
|
|
|
- * {@link DeletionService#delete(String, Path, Path...)}
|
|
|
- * @param baseDirs base directories as required in
|
|
|
- * {@link DeletionService#delete(String, Path, Path...)}
|
|
|
- */
|
|
|
- public FileDeletionTask createFileDeletionTask(String user, Path subDir,
|
|
|
- Path[] baseDirs) {
|
|
|
- return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
|
|
|
}
|
|
|
|
|
|
- private void recover(RecoveredDeletionServiceState state)
|
|
|
+ private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
|
|
|
throws IOException {
|
|
|
List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
|
|
|
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
|
|
|
- new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
|
|
|
- Set<Integer> successorTasks = new HashSet<Integer>();
|
|
|
+ new HashMap<>(taskProtos.size());
|
|
|
+ Set<Integer> successorTasks = new HashSet<>();
|
|
|
for (DeletionServiceDeleteTaskProto proto : taskProtos) {
|
|
|
- DeletionTaskRecoveryInfo info = parseTaskProto(proto);
|
|
|
- idToInfoMap.put(info.task.taskId, info);
|
|
|
- nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
|
|
|
- successorTasks.addAll(info.successorTaskIds);
|
|
|
+ DeletionTaskRecoveryInfo info =
|
|
|
+ NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
|
|
|
+ idToInfoMap.put(info.getTask().getTaskId(), info);
|
|
|
+ nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
|
|
|
+ successorTasks.addAll(info.getSuccessorTaskIds());
|
|
|
}
|
|
|
|
|
|
// restore the task dependencies and schedule the deletion tasks that
|
|
|
// have no predecessors
|
|
|
final long now = System.currentTimeMillis();
|
|
|
for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
|
|
|
- for (Integer successorId : info.successorTaskIds){
|
|
|
+ for (Integer successorId : info.getSuccessorTaskIds()){
|
|
|
DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
|
|
|
if (successor != null) {
|
|
|
- info.task.addFileDeletionTaskDependency(successor.task);
|
|
|
+ info.getTask().addDeletionTaskDependency(successor.getTask());
|
|
|
} else {
|
|
|
LOG.error("Unable to locate dependency task for deletion task "
|
|
|
- + info.task.taskId + " at " + info.task.getSubDir());
|
|
|
+ + info.getTask().getTaskId());
|
|
|
}
|
|
|
}
|
|
|
- if (!successorTasks.contains(info.task.taskId)) {
|
|
|
- long msecTilDeletion = info.deletionTimestamp - now;
|
|
|
- sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
|
|
|
+ if (!successorTasks.contains(info.getTask().getTaskId())) {
|
|
|
+ long msecTilDeletion = info.getDeletionTimestamp() - now;
|
|
|
+ sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private DeletionTaskRecoveryInfo parseTaskProto(
|
|
|
- DeletionServiceDeleteTaskProto proto) throws IOException {
|
|
|
- int taskId = proto.getId();
|
|
|
- String user = proto.hasUser() ? proto.getUser() : null;
|
|
|
- Path subdir = null;
|
|
|
- List<Path> basePaths = null;
|
|
|
- if (proto.hasSubdir()) {
|
|
|
- subdir = new Path(proto.getSubdir());
|
|
|
- }
|
|
|
- List<String> basedirs = proto.getBasedirsList();
|
|
|
- if (basedirs != null && basedirs.size() > 0) {
|
|
|
- basePaths = new ArrayList<Path>(basedirs.size());
|
|
|
- for (String basedir : basedirs) {
|
|
|
- basePaths.add(new Path(basedir));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- FileDeletionTask task = new FileDeletionTask(taskId, this, user,
|
|
|
- subdir, basePaths);
|
|
|
- return new DeletionTaskRecoveryInfo(task,
|
|
|
- proto.getSuccessorIdsList(),
|
|
|
- proto.getDeletionTime());
|
|
|
- }
|
|
|
-
|
|
|
private int generateTaskId() {
|
|
|
// get the next ID but avoid an invalid ID
|
|
|
int taskId = nextTaskId.incrementAndGet();
|
|
|
- while (taskId == FileDeletionTask.INVALID_TASK_ID) {
|
|
|
+ while (taskId == DeletionTask.INVALID_TASK_ID) {
|
|
|
taskId = nextTaskId.incrementAndGet();
|
|
|
}
|
|
|
return taskId;
|
|
|
}
|
|
|
|
|
|
- private void recordDeletionTaskInStateStore(FileDeletionTask task) {
|
|
|
+ private void recordDeletionTaskInStateStore(DeletionTask task) {
|
|
|
if (!stateStore.canRecover()) {
|
|
|
// optimize the case where we aren't really recording
|
|
|
return;
|
|
|
}
|
|
|
- if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
|
|
|
+ if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) {
|
|
|
return; // task already recorded
|
|
|
}
|
|
|
|
|
|
- task.taskId = generateTaskId();
|
|
|
-
|
|
|
- FileDeletionTask[] successors = task.getSuccessorTasks();
|
|
|
+ task.setTaskId(generateTaskId());
|
|
|
|
|
|
// store successors first to ensure task IDs have been generated for them
|
|
|
- for (FileDeletionTask successor : successors) {
|
|
|
+ DeletionTask[] successors = task.getSuccessorTasks();
|
|
|
+ for (DeletionTask successor : successors) {
|
|
|
recordDeletionTaskInStateStore(successor);
|
|
|
}
|
|
|
|
|
|
- DeletionServiceDeleteTaskProto.Builder builder =
|
|
|
- DeletionServiceDeleteTaskProto.newBuilder();
|
|
|
- builder.setId(task.taskId);
|
|
|
- if (task.getUser() != null) {
|
|
|
- builder.setUser(task.getUser());
|
|
|
- }
|
|
|
- if (task.getSubDir() != null) {
|
|
|
- builder.setSubdir(task.getSubDir().toString());
|
|
|
- }
|
|
|
- builder.setDeletionTime(System.currentTimeMillis() +
|
|
|
- TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
|
|
|
- if (task.getBaseDirs() != null) {
|
|
|
- for (Path dir : task.getBaseDirs()) {
|
|
|
- builder.addBasedirs(dir.toString());
|
|
|
- }
|
|
|
- }
|
|
|
- for (FileDeletionTask successor : successors) {
|
|
|
- builder.addSuccessorIds(successor.taskId);
|
|
|
- }
|
|
|
-
|
|
|
try {
|
|
|
- stateStore.storeDeletionTask(task.taskId, builder.build());
|
|
|
+ stateStore.storeDeletionTask(task.getTaskId(),
|
|
|
+ task.convertDeletionTaskToProto());
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Unable to store deletion task " + task.taskId + " for "
|
|
|
- + task.getSubDir(), e);
|
|
|
+ LOG.error("Unable to store deletion task " + task.getTaskId(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static class DeletionTaskRecoveryInfo {
|
|
|
- FileDeletionTask task;
|
|
|
- List<Integer> successorTaskIds;
|
|
|
- long deletionTimestamp;
|
|
|
+ @Override
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
+ ThreadFactory tf = new ThreadFactoryBuilder()
|
|
|
+ .setNameFormat("DeletionService #%d")
|
|
|
+ .build();
|
|
|
+ if (conf != null) {
|
|
|
+ sched = new HadoopScheduledThreadPoolExecutor(
|
|
|
+ conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
|
|
|
+ YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
|
|
|
+ debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
|
|
|
+ } else {
|
|
|
+ sched = new HadoopScheduledThreadPoolExecutor(
|
|
|
+ YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
|
|
|
+ }
|
|
|
+ sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
|
+ sched.setKeepAliveTime(60L, SECONDS);
|
|
|
+ if (stateStore.canRecover()) {
|
|
|
+ recover(stateStore.loadDeletionServiceState());
|
|
|
+ }
|
|
|
+ super.serviceInit(conf);
|
|
|
+ }
|
|
|
|
|
|
- public DeletionTaskRecoveryInfo(FileDeletionTask task,
|
|
|
- List<Integer> successorTaskIds, long deletionTimestamp) {
|
|
|
- this.task = task;
|
|
|
- this.successorTaskIds = successorTaskIds;
|
|
|
- this.deletionTimestamp = deletionTimestamp;
|
|
|
+ @Override
|
|
|
+ public void serviceStop() throws Exception {
|
|
|
+ if (sched != null) {
|
|
|
+ sched.shutdown();
|
|
|
+ boolean terminated = false;
|
|
|
+ try {
|
|
|
+ terminated = sched.awaitTermination(10, SECONDS);
|
|
|
+ } catch (InterruptedException e) { }
|
|
|
+ if (!terminated) {
|
|
|
+ sched.shutdownNow();
|
|
|
+ }
|
|
|
}
|
|
|
+ super.serviceStop();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Determine if the service has completely stopped.
|
|
|
+ * Used only by unit tests
|
|
|
+ * @return true if service has completely stopped
|
|
|
+ */
|
|
|
+ @Private
|
|
|
+ public boolean isTerminated() {
|
|
|
+ return getServiceState() == STATE.STOPPED && sched.isTerminated();
|
|
|
}
|
|
|
}
|