|
@@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
@@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
@@ -50,6 +57,8 @@ public class DeletionService extends AbstractService {
|
|
|
private final ContainerExecutor exec;
|
|
|
private ScheduledThreadPoolExecutor sched;
|
|
|
private static final FileContext lfs = getLfs();
|
|
|
+ private final NMStateStoreService stateStore;
|
|
|
+ private AtomicInteger nextTaskId = new AtomicInteger(0);
|
|
|
|
|
|
static final FileContext getLfs() {
|
|
|
try {
|
|
@@ -60,13 +69,17 @@ public class DeletionService extends AbstractService {
|
|
|
}
|
|
|
|
|
|
public DeletionService(ContainerExecutor exec) {
|
|
|
+ this(exec, new NMNullStateStoreService());
|
|
|
+ }
|
|
|
+
|
|
|
+ public DeletionService(ContainerExecutor exec,
|
|
|
+ NMStateStoreService stateStore) {
|
|
|
super(DeletionService.class.getName());
|
|
|
this.exec = exec;
|
|
|
this.debugDelay = 0;
|
|
|
+ this.stateStore = stateStore;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- *
|
|
|
/**
|
|
|
* Delete the path(s) as this user.
|
|
|
* @param user The user to delete as, or the JVM user if null
|
|
@@ -76,19 +89,20 @@ public class DeletionService extends AbstractService {
|
|
|
public void delete(String user, Path subDir, Path... baseDirs) {
|
|
|
// TODO if parent owned by NM, rename within parent inline
|
|
|
if (debugDelay != -1) {
|
|
|
- if (baseDirs == null || baseDirs.length == 0) {
|
|
|
- sched.schedule(new FileDeletionTask(this, user, subDir, null),
|
|
|
- debugDelay, TimeUnit.SECONDS);
|
|
|
- } else {
|
|
|
- sched.schedule(
|
|
|
- new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
|
|
|
- debugDelay, TimeUnit.SECONDS);
|
|
|
+ List<Path> baseDirList = null;
|
|
|
+ if (baseDirs != null && baseDirs.length != 0) {
|
|
|
+ baseDirList = Arrays.asList(baseDirs);
|
|
|
}
|
|
|
+ FileDeletionTask task =
|
|
|
+ new FileDeletionTask(this, user, subDir, baseDirList);
|
|
|
+ recordDeletionTaskInStateStore(task);
|
|
|
+ sched.schedule(task, debugDelay, TimeUnit.SECONDS);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
|
|
|
if (debugDelay != -1) {
|
|
|
+ recordDeletionTaskInStateStore(fileDeletionTask);
|
|
|
sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
|
|
|
}
|
|
|
}
|
|
@@ -109,6 +123,9 @@ public class DeletionService extends AbstractService {
|
|
|
}
|
|
|
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
|
sched.setKeepAliveTime(60L, SECONDS);
|
|
|
+ if (stateStore.canRecover()) {
|
|
|
+ recover(stateStore.loadDeletionServiceState());
|
|
|
+ }
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -139,6 +156,8 @@ public class DeletionService extends AbstractService {
|
|
|
}
|
|
|
|
|
|
public static class FileDeletionTask implements Runnable {
|
|
|
+ public static final int INVALID_TASK_ID = -1;
|
|
|
+ private int taskId;
|
|
|
private final String user;
|
|
|
private final Path subDir;
|
|
|
private final List<Path> baseDirs;
|
|
@@ -152,6 +171,12 @@ public class DeletionService extends AbstractService {
|
|
|
|
|
|
private FileDeletionTask(DeletionService delService, String user,
|
|
|
Path subDir, List<Path> baseDirs) {
|
|
|
+ this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
|
|
|
+ }
|
|
|
+
|
|
|
+ private FileDeletionTask(int taskId, DeletionService delService,
|
|
|
+ String user, Path subDir, List<Path> baseDirs) {
|
|
|
+ this.taskId = taskId;
|
|
|
this.delService = delService;
|
|
|
this.user = user;
|
|
|
this.subDir = subDir;
|
|
@@ -198,6 +223,12 @@ public class DeletionService extends AbstractService {
|
|
|
return this.success;
|
|
|
}
|
|
|
|
|
|
+ public synchronized FileDeletionTask[] getSuccessorTasks() {
|
|
|
+ FileDeletionTask[] successors =
|
|
|
+ new FileDeletionTask[successorTaskSet.size()];
|
|
|
+ return successorTaskSet.toArray(successors);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void run() {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -286,6 +317,12 @@ public class DeletionService extends AbstractService {
|
|
|
* dependent tasks of it has failed marking its success = false.
|
|
|
*/
|
|
|
private synchronized void fileDeletionTaskFinished() {
|
|
|
+ try {
|
|
|
+ delService.stateStore.removeDeletionTask(taskId);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Unable to remove deletion task " + taskId
|
|
|
+ + " from state store", e);
|
|
|
+ }
|
|
|
Iterator<FileDeletionTask> successorTaskI =
|
|
|
this.successorTaskSet.iterator();
|
|
|
while (successorTaskI.hasNext()) {
|
|
@@ -318,4 +355,129 @@ public class DeletionService extends AbstractService {
|
|
|
Path[] baseDirs) {
|
|
|
return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
|
|
|
}
|
|
|
+
|
|
|
+ private void recover(RecoveredDeletionServiceState state)
|
|
|
+ throws IOException {
|
|
|
+ List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
|
|
|
+ Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
|
|
|
+ new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
|
|
|
+ Set<Integer> successorTasks = new HashSet<Integer>();
|
|
|
+ for (DeletionServiceDeleteTaskProto proto : taskProtos) {
|
|
|
+ DeletionTaskRecoveryInfo info = parseTaskProto(proto);
|
|
|
+ idToInfoMap.put(info.task.taskId, info);
|
|
|
+ nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
|
|
|
+ successorTasks.addAll(info.successorTaskIds);
|
|
|
+ }
|
|
|
+
|
|
|
+ // restore the task dependencies and schedule the deletion tasks that
|
|
|
+ // have no predecessors
|
|
|
+ final long now = System.currentTimeMillis();
|
|
|
+ for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
|
|
|
+ for (Integer successorId : info.successorTaskIds){
|
|
|
+ DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
|
|
|
+ if (successor != null) {
|
|
|
+ info.task.addFileDeletionTaskDependency(successor.task);
|
|
|
+ } else {
|
|
|
+ LOG.error("Unable to locate dependency task for deletion task "
|
|
|
+ + info.task.taskId + " at " + info.task.getSubDir());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!successorTasks.contains(info.task.taskId)) {
|
|
|
+ long msecTilDeletion = info.deletionTimestamp - now;
|
|
|
+ sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private DeletionTaskRecoveryInfo parseTaskProto(
|
|
|
+ DeletionServiceDeleteTaskProto proto) throws IOException {
|
|
|
+ int taskId = proto.getId();
|
|
|
+ String user = proto.hasUser() ? proto.getUser() : null;
|
|
|
+ Path subdir = null;
|
|
|
+ List<Path> basePaths = null;
|
|
|
+ if (proto.hasSubdir()) {
|
|
|
+ subdir = new Path(proto.getSubdir());
|
|
|
+ }
|
|
|
+ List<String> basedirs = proto.getBasedirsList();
|
|
|
+ if (basedirs != null && basedirs.size() > 0) {
|
|
|
+ basePaths = new ArrayList<Path>(basedirs.size());
|
|
|
+ for (String basedir : basedirs) {
|
|
|
+ basePaths.add(new Path(basedir));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ FileDeletionTask task = new FileDeletionTask(taskId, this, user,
|
|
|
+ subdir, basePaths);
|
|
|
+ return new DeletionTaskRecoveryInfo(task,
|
|
|
+ proto.getSuccessorIdsList(),
|
|
|
+ proto.getDeletionTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ private int generateTaskId() {
|
|
|
+ // get the next ID but avoid an invalid ID
|
|
|
+ int taskId = nextTaskId.incrementAndGet();
|
|
|
+ while (taskId == FileDeletionTask.INVALID_TASK_ID) {
|
|
|
+ taskId = nextTaskId.incrementAndGet();
|
|
|
+ }
|
|
|
+ return taskId;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void recordDeletionTaskInStateStore(FileDeletionTask task) {
|
|
|
+ if (!stateStore.canRecover()) {
|
|
|
+ // optimize the case where we aren't really recording
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
|
|
|
+ return; // task already recorded
|
|
|
+ }
|
|
|
+
|
|
|
+ task.taskId = generateTaskId();
|
|
|
+
|
|
|
+ FileDeletionTask[] successors = task.getSuccessorTasks();
|
|
|
+
|
|
|
+ // store successors first to ensure task IDs have been generated for them
|
|
|
+ for (FileDeletionTask successor : successors) {
|
|
|
+ recordDeletionTaskInStateStore(successor);
|
|
|
+ }
|
|
|
+
|
|
|
+ DeletionServiceDeleteTaskProto.Builder builder =
|
|
|
+ DeletionServiceDeleteTaskProto.newBuilder();
|
|
|
+ builder.setId(task.taskId);
|
|
|
+ if (task.getUser() != null) {
|
|
|
+ builder.setUser(task.getUser());
|
|
|
+ }
|
|
|
+ if (task.getSubDir() != null) {
|
|
|
+ builder.setSubdir(task.getSubDir().toString());
|
|
|
+ }
|
|
|
+ builder.setDeletionTime(System.currentTimeMillis() +
|
|
|
+ TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
|
|
|
+ if (task.getBaseDirs() != null) {
|
|
|
+ for (Path dir : task.getBaseDirs()) {
|
|
|
+ builder.addBasedirs(dir.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (FileDeletionTask successor : successors) {
|
|
|
+ builder.addSuccessorIds(successor.taskId);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ stateStore.storeDeletionTask(task.taskId, builder.build());
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Unable to store deletion task " + task.taskId + " for "
|
|
|
+ + task.getSubDir(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class DeletionTaskRecoveryInfo {
|
|
|
+ FileDeletionTask task;
|
|
|
+ List<Integer> successorTaskIds;
|
|
|
+ long deletionTimestamp;
|
|
|
+
|
|
|
+ public DeletionTaskRecoveryInfo(FileDeletionTask task,
|
|
|
+ List<Integer> successorTaskIds, long deletionTimestamp) {
|
|
|
+ this.task = task;
|
|
|
+ this.successorTaskIds = successorTaskIds;
|
|
|
+ this.deletionTimestamp = deletionTimestamp;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|