|
@@ -27,6 +27,7 @@ import java.io.IOException;
|
|
|
import java.text.DateFormat;
|
|
|
import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Date;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -66,23 +67,18 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
new SimpleDateFormat("yyMMddHHmm");
|
|
|
private static final int MSECS_PER_MINUTE = 60*1000;
|
|
|
|
|
|
- private Path current;
|
|
|
- private Path homesParent;
|
|
|
private long emptierInterval;
|
|
|
|
|
|
public TrashPolicyDefault() { }
|
|
|
|
|
|
- private TrashPolicyDefault(FileSystem fs, Path home, Configuration conf)
|
|
|
+ private TrashPolicyDefault(FileSystem fs, Configuration conf)
|
|
|
throws IOException {
|
|
|
- initialize(conf, fs, home);
|
|
|
+ initialize(conf, fs);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void initialize(Configuration conf, FileSystem fs, Path home) {
|
|
|
this.fs = fs;
|
|
|
- this.trash = new Path(home, TRASH);
|
|
|
- this.homesParent = home.getParent();
|
|
|
- this.current = new Path(trash, CURRENT);
|
|
|
this.deletionInterval = (long)(conf.getFloat(
|
|
|
FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
|
|
|
* MSECS_PER_MINUTE);
|
|
@@ -91,6 +87,17 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
* MSECS_PER_MINUTE);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void initialize(Configuration conf, FileSystem fs) {
|
|
|
+ this.fs = fs;
|
|
|
+ this.deletionInterval = (long)(conf.getFloat(
|
|
|
+ FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
|
|
|
+ * MSECS_PER_MINUTE);
|
|
|
+ this.emptierInterval = (long)(conf.getFloat(
|
|
|
+ FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
|
|
|
+ * MSECS_PER_MINUTE);
|
|
|
+ }
|
|
|
+
|
|
|
private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
|
|
|
return Path.mergePaths(basePath, rmFilePath);
|
|
|
}
|
|
@@ -113,17 +120,19 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
|
|
|
String qpath = fs.makeQualified(path).toString();
|
|
|
|
|
|
- if (qpath.startsWith(trash.toString())) {
|
|
|
+ Path trashRoot = fs.getTrashRoot(path);
|
|
|
+ Path trashCurrent = new Path(trashRoot, CURRENT);
|
|
|
+ if (qpath.startsWith(trashRoot.toString())) {
|
|
|
return false; // already in trash
|
|
|
}
|
|
|
|
|
|
- if (trash.getParent().toString().startsWith(qpath)) {
|
|
|
+ if (trashRoot.getParent().toString().startsWith(qpath)) {
|
|
|
throw new IOException("Cannot move \"" + path +
|
|
|
"\" to the trash, as it contains the trash");
|
|
|
}
|
|
|
|
|
|
- Path trashPath = makeTrashRelativePath(current, path);
|
|
|
- Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
|
|
|
+ Path trashPath = makeTrashRelativePath(trashCurrent, path);
|
|
|
+ Path baseTrashPath = makeTrashRelativePath(trashCurrent, path.getParent());
|
|
|
|
|
|
IOException cause = null;
|
|
|
|
|
@@ -148,14 +157,16 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
trashPath = new Path(orig + Time.now());
|
|
|
}
|
|
|
|
|
|
- if (fs.rename(path, trashPath)) // move to current trash
|
|
|
+ if (fs.rename(path, trashPath)) { // move to current trash
|
|
|
+ LOG.info("Moved: '" + path + "' to trash at: " + trashPath);
|
|
|
return true;
|
|
|
+ }
|
|
|
} catch (IOException e) {
|
|
|
cause = e;
|
|
|
}
|
|
|
}
|
|
|
throw (IOException)
|
|
|
- new IOException("Failed to move to trash: "+path).initCause(cause);
|
|
|
+ new IOException("Failed to move to trash: " + path).initCause(cause);
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
@@ -166,72 +177,32 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
|
public void createCheckpoint(Date date) throws IOException {
|
|
|
-
|
|
|
- if (!fs.exists(current)) // no trash, no checkpoint
|
|
|
- return;
|
|
|
-
|
|
|
- Path checkpointBase;
|
|
|
- synchronized (CHECKPOINT) {
|
|
|
- checkpointBase = new Path(trash, CHECKPOINT.format(date));
|
|
|
-
|
|
|
+ Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
|
|
+ for (FileStatus trashRoot: trashRoots) {
|
|
|
+ LOG.info("TrashPolicyDefault#createCheckpoint for trashRoot: " +
|
|
|
+ trashRoot.getPath());
|
|
|
+ createCheckpoint(trashRoot.getPath(), date);
|
|
|
}
|
|
|
- Path checkpoint = checkpointBase;
|
|
|
-
|
|
|
- int attempt = 0;
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- fs.rename(current, checkpoint, Rename.NONE);
|
|
|
- break;
|
|
|
- } catch (FileAlreadyExistsException e) {
|
|
|
- if (++attempt > 1000) {
|
|
|
- throw new IOException("Failed to checkpoint trash: "+checkpoint);
|
|
|
- }
|
|
|
- checkpoint = checkpointBase.suffix("-" + attempt);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void deleteCheckpoint() throws IOException {
|
|
|
- FileStatus[] dirs = null;
|
|
|
-
|
|
|
- try {
|
|
|
- dirs = fs.listStatus(trash); // scan trash sub-directories
|
|
|
- } catch (FileNotFoundException fnfe) {
|
|
|
- return;
|
|
|
+ Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
|
|
|
+ for (FileStatus trashRoot : trashRoots) {
|
|
|
+ LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
|
|
|
+ trashRoot.getPath());
|
|
|
+ deleteCheckpoint(trashRoot.getPath());
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- long now = Time.now();
|
|
|
- for (int i = 0; i < dirs.length; i++) {
|
|
|
- Path path = dirs[i].getPath();
|
|
|
- String dir = path.toUri().getPath();
|
|
|
- String name = path.getName();
|
|
|
- if (name.equals(CURRENT.getName())) // skip current
|
|
|
- continue;
|
|
|
-
|
|
|
- long time;
|
|
|
- try {
|
|
|
- time = getTimeFromCheckpoint(name);
|
|
|
- } catch (ParseException e) {
|
|
|
- LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- if ((now - deletionInterval) > time) {
|
|
|
- if (fs.delete(path, true)) {
|
|
|
- LOG.info("Deleted trash checkpoint: "+dir);
|
|
|
- } else {
|
|
|
- LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public Path getCurrentTrashDir() throws IOException {
|
|
|
+ return new Path(fs.getTrashRoot(null), CURRENT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Path getCurrentTrashDir() {
|
|
|
- return current;
|
|
|
+ public Path getCurrentTrashDir(Path path) throws IOException {
|
|
|
+ return new Path(fs.getTrashRoot(path), CURRENT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -278,25 +249,24 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
try {
|
|
|
now = Time.now();
|
|
|
if (now >= end) {
|
|
|
-
|
|
|
- FileStatus[] homes = null;
|
|
|
+ Collection<FileStatus> trashRoots;
|
|
|
try {
|
|
|
- homes = fs.listStatus(homesParent); // list all home dirs
|
|
|
+ trashRoots = fs.getTrashRoots(true); // list all home dirs
|
|
|
} catch (IOException e) {
|
|
|
- LOG.warn("Trash can't list homes: "+e+" Sleeping.");
|
|
|
+ LOG.warn("Trash can't list all trash roots: "+e+" Sleeping.");
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- for (FileStatus home : homes) { // dump each trash
|
|
|
- if (!home.isDirectory())
|
|
|
+ for (FileStatus trashRoot : trashRoots) { // dump each trash
|
|
|
+ if (!trashRoot.isDirectory())
|
|
|
continue;
|
|
|
try {
|
|
|
- TrashPolicyDefault trash = new TrashPolicyDefault(
|
|
|
- fs, home.getPath(), conf);
|
|
|
- trash.deleteCheckpoint();
|
|
|
- trash.createCheckpoint(new Date(now));
|
|
|
+ TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
|
|
|
+ trash.deleteCheckpoint(trashRoot.getPath());
|
|
|
+ trash.createCheckpoint(trashRoot.getPath(), new Date(now));
|
|
|
} catch (IOException e) {
|
|
|
- LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
|
|
|
+ LOG.warn("Trash caught: "+e+". Skipping " +
|
|
|
+ trashRoot.getPath() + ".");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -319,6 +289,69 @@ public class TrashPolicyDefault extends TrashPolicy {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void createCheckpoint(Path trashRoot, Date date) throws IOException {
|
|
|
+ if (!fs.exists(new Path(trashRoot, CURRENT))) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Path checkpointBase;
|
|
|
+ synchronized (CHECKPOINT) {
|
|
|
+ checkpointBase = new Path(trashRoot, CHECKPOINT.format(date));
|
|
|
+ }
|
|
|
+ Path checkpoint = checkpointBase;
|
|
|
+ Path current = new Path(trashRoot, CURRENT);
|
|
|
+
|
|
|
+ int attempt = 0;
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ fs.rename(current, checkpoint, Rename.NONE);
|
|
|
+ LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath());
|
|
|
+ break;
|
|
|
+ } catch (FileAlreadyExistsException e) {
|
|
|
+ if (++attempt > 1000) {
|
|
|
+ throw new IOException("Failed to checkpoint trash: " + checkpoint);
|
|
|
+ }
|
|
|
+ checkpoint = checkpointBase.suffix("-" + attempt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void deleteCheckpoint(Path trashRoot) throws IOException {
|
|
|
+ LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
|
|
|
+
|
|
|
+ FileStatus[] dirs = null;
|
|
|
+ try {
|
|
|
+ dirs = fs.listStatus(trashRoot); // scan trash sub-directories
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long now = Time.now();
|
|
|
+ for (int i = 0; i < dirs.length; i++) {
|
|
|
+ Path path = dirs[i].getPath();
|
|
|
+ String dir = path.toUri().getPath();
|
|
|
+ String name = path.getName();
|
|
|
+ if (name.equals(CURRENT.getName())) { // skip current
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ long time;
|
|
|
+ try {
|
|
|
+ time = getTimeFromCheckpoint(name);
|
|
|
+ } catch (ParseException e) {
|
|
|
+ LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((now - deletionInterval) > time) {
|
|
|
+ if (fs.delete(path, true)) {
|
|
|
+ LOG.info("Deleted trash checkpoint: "+dir);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private long getTimeFromCheckpoint(String name) throws ParseException {
|
|
|
long time;
|
|
|
|