Преглед изворни кода

HADOOP-432. Add a trash feature, disabled by default.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@513988 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting пре 18 година
родитељ
комит
ca4b754141

+ 5 - 0
CHANGES.txt

@@ -199,6 +199,11 @@ Trunk (unreleased changes)
     once.  Large lists were causing datenodes to timeout.
     (Dhruba Borthakur via cutting) 
 
+62. HADOOP-432.  Add a trash feature, disabled by default.  When
+    enabled, the FSShell 'rm' command will move things to a trash
+    directory in the filesystem.  In HDFS, a thread periodically
+    checkpoints the trash and removes old checkpoints.  (cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

+ 15 - 0
conf/hadoop-default.xml

@@ -106,6 +106,21 @@ creations/deletions), or "all".</description>
   determine the host, port, etc. for a filesystem.</description>
 </property>
 
+<property>
+  <name>fs.trash.root</name>
+  <value>${hadoop.tmp.dir}/Trash</value>
+  <description>The trash directory, used by FsShell's 'rm' command.
+  </description>
+</property>
+
+<property>
+  <name>fs.trash.interval</name>
+  <value>0</value>
+  <description>Number of minutes between trash checkpoints.
+  If zero, the trash feature is disabled.
+  </description>
+</property>
+
 <property>
   <name>fs.file.impl</name>
   <value>org.apache.hadoop.fs.LocalFileSystem</value>

+ 7 - 1
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.dfs;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
@@ -83,6 +84,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
 
     private FSNamesystem namesystem;
     private Server server;
+    private Thread emptier;
     private int handlerCount = 2;
     
     /** only used for testing purposes  */
@@ -178,6 +180,10 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       this.server = RPC.getServer(this, hostname, port, handlerCount, 
                                   false, conf);
       this.server.start();      
+
+      this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
+      this.emptier.setDaemon(true);
+      this.emptier.start();
     }
     
     /**
@@ -225,8 +231,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
       if (! stopRequested) {
         stopRequested = true;
         namesystem.close();
+        emptier.interrupt();
         server.stop();
-        //this.join();
       }
     }
 

+ 14 - 0
src/java/org/apache/hadoop/fs/FsShell.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.util.ToolBase;
 public class FsShell extends ToolBase {
 
     protected FileSystem fs;
+    private Trash trash;
 
     /**
      */
@@ -38,6 +39,7 @@ public class FsShell extends ToolBase {
     public void init() throws IOException {
         conf.setQuietMode(true);
         this.fs = FileSystem.get(conf);
+        this.trash = new Trash(conf);
     }
 
     /**
@@ -583,6 +585,10 @@ public class FsShell extends ToolBase {
                            "\", use -rmr instead");
       }
 
+      if (trash.moveToTrash(src)) {
+        System.out.println("Moved to trash: " + src);
+        return;
+      }
       if (fs.delete(src)) {
         System.out.println("Deleted " + src);
       } else {
@@ -590,6 +596,11 @@ public class FsShell extends ToolBase {
       }
     }
 
+    private void expunge() throws IOException {
+      trash.expunge();
+      trash.checkpoint();
+    }
+
     /**
      * Return an abbreviated English-language desc of the byte length
      */
@@ -737,6 +748,7 @@ public class FsShell extends ToolBase {
             System.err.println("           [-cp <src> <dst>]");
             System.err.println("           [-rm <path>]");
             System.err.println("           [-rmr <path>]");
+            System.err.println("           [-expunge]");
             System.err.println("           [-put <localsrc> <dst>]");
             System.err.println("           [-copyFromLocal <localsrc> <dst>]");
             System.err.println("           [-moveFromLocal <localsrc> <dst>]");
@@ -843,6 +855,8 @@ public class FsShell extends ToolBase {
                 exitCode = doall(cmd, argv, conf, i);
             } else if ("-rmr".equals(cmd)) {
                 exitCode = doall(cmd, argv, conf, i);
+            } else if ("-expunge".equals(cmd)) {
+                expunge();
             } else if ("-du".equals(cmd)) {
                 if (i < argv.length) {
                     exitCode = doall(cmd, argv, conf, i);

+ 214 - 0
src/java/org/apache/hadoop/fs/Trash.java

@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.text.*;
+import java.io.*;
+import java.net.URI;
+import java.util.Date;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.conf.*;
+
+/** Provides a <i>trash</i> feature.  Files may be moved to a trash directory.
+ * They're initially stored in a <i>current</i> sub-directory of the trash
+ * directory.  Within that sub-directory their original path is preserved.
+ * Periodically one may checkpoint the current trash and remove older
+ * checkpoints.  (This design permits trash management without enumeration of
+ * the full trash content, without date support in the filesystem, and without
+ * clock synchronization.)
+ */
+public class Trash extends Configured {
+  private static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.fs.Trash");
+
+  private static final String CURRENT = "Current";
+  private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmm");
+  private static final int MSECS_PER_MINUTE = 60*1000;
+
+  private FileSystem fs;
+  private Path root;
+  private Path current;
+  private long interval;
+
+  /** Construct a trash can accessor.
+   * @param conf a Configuration
+   */
+  public Trash(Configuration conf) throws IOException {
+    super(conf);
+
+    Path root = new Path(conf.get("fs.trash.root", "/tmp/Trash"));
+
+    this.fs = root.getFileSystem(conf);
+
+    if (!root.isAbsolute())
+      root = new Path(fs.getWorkingDirectory(), root);
+
+    this.root = root;
+    this.current = new Path(root, CURRENT);
+    this.interval = conf.getLong("fs.trash.interval", 60) * MSECS_PER_MINUTE;
+  }
+
+  /** Move a file or directory to the current trash directory.
+   * @return false if the item is already in the trash or trash is disabled
+   */ 
+  public boolean moveToTrash(Path path) throws IOException {
+    if (interval == 0)
+      return false;
+
+    if (!path.isAbsolute())                       // make path absolute
+      path = new Path(fs.getWorkingDirectory(), path);
+
+    if (!fs.exists(path))                         // check that path exists
+      throw new FileNotFoundException(path.toString());
+
+    URI rootUri = root.toUri();
+    String dirPath = path.toUri().getPath();
+
+    if (dirPath.startsWith(rootUri.getPath())) {  // already in trash
+      return false;
+    }
+
+    Path trashPath =                              // create path in current
+      new Path(rootUri.getScheme(), rootUri.getAuthority(),
+               current.toUri().getPath()+dirPath);
+
+    IOException cause = null;
+    
+    // try twice, in case checkpoint between the mkdirs() & rename()
+    for (int i = 0; i < 2; i++) {
+      Path trashDir = trashPath.getParent();
+      if (!fs.mkdirs(trashDir)) {                 // make parent directory
+        throw new IOException("Failed to create trash directory: "+trashDir);
+      }
+      try {
+        if (fs.rename(path, trashPath))           // move to current trash
+          return true;
+      } catch (IOException e) {
+        cause = e;
+      }
+    }
+    throw (IOException)
+      new IOException("Failed to move to trash: "+path).initCause(cause);
+  }
+
+  /** Create a trash checkpoint. */
+  public void checkpoint() throws IOException {
+    if (!fs.exists(current))                      // no trash, no checkpoint
+      return;
+
+    Path checkpoint;
+    synchronized (CHECKPOINT) {
+      checkpoint = new Path(root, CHECKPOINT.format(new Date()));
+    }
+
+    if (fs.rename(current, checkpoint)) {
+      LOG.info("Created trash checkpoint: "+checkpoint);
+    } else {
+      throw new IOException("Failed to checkpoint trash: "+checkpoint);
+    }
+  }
+
+  /** Delete old checkpoints. */
+  public void expunge() throws IOException {
+    Path[] dirs = fs.listPaths(root);             // scan trash sub-directories
+    long now = System.currentTimeMillis();
+    for (int i = 0; i < dirs.length; i++) {
+      Path dir = dirs[i];
+      String name = dir.getName();
+      if (name.equals(CURRENT))                   // skip current
+        continue;
+
+      long time;
+      try {
+        synchronized (CHECKPOINT) {
+          time = CHECKPOINT.parse(name).getTime();
+        }
+      } catch (ParseException e) {
+        LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
+        continue;
+      }
+
+      if ((now - interval) > time) {
+        if (fs.delete(dir)) {
+          LOG.info("Deleted trash checkpoint: "+dir);
+        } else {
+          LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
+        }
+      }
+    }
+  }
+
+  /** Return a {@link Runnable} that periodically empties the trash.
+   * Only one checkpoint is kept at a time.
+   */
+  public Runnable getEmptier() {
+    return new Emptier();
+  }
+
+  private class Emptier implements Runnable {
+
+    public void run() {
+      if (interval == 0)
+        return;                                   // trash disabled
+
+      long now = System.currentTimeMillis();
+      long end = ceiling(now, interval);
+      while (true) {
+        try {                                     // sleep for interval
+          Thread.sleep(end - now);
+        } catch (InterruptedException e) {
+          return;                                 // exit on interrupt
+        }
+          
+        now = System.currentTimeMillis();
+        if (now >= end) {
+
+          try {
+            expunge();
+          } catch (IOException e) {
+            LOG.warn("Trash expunge caught: "+e+". Ignoring.");
+          }
+
+          try {
+            checkpoint();
+          } catch (IOException e) {
+            LOG.warn("Trash checkpoint caught: "+e+". Ignoring.");
+          }
+
+          end = ceiling(now, interval);
+        }
+      }
+    }
+
+    private long ceiling(long time, long interval) {
+      return floor(time, interval) + interval;
+    }
+    private long floor(long time, long interval) {
+      return (time / interval) * interval;
+    }
+
+  }
+
+  /** Run an emptier.*/
+  public static void main(String[] args) throws Exception {
+    new Trash(new Configuration()).getEmptier().run();
+  }
+
+}