Browse Source

HADOOP-1595. dfsshell may wait for a file to achieve its intended
replication target. (Tsz Wo (Nicholas), SZE via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@565558 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 18 years ago
parent
commit
acbe28e2e1

+ 3 - 0
CHANGES.txt

@@ -44,6 +44,9 @@ Trunk (unreleased changes)
     HADOOP-1651.  Improve progress reporting.
     HADOOP-1651.  Improve progress reporting.
     (Devaraj Das via tomwhite)
     (Devaraj Das via tomwhite)
 
 
+    HADOOP-1595.  dfsshell can wait for a file to achieve its intended
+    replication target. (Tsz Wo (Nicholas), SZE via dhruba)
+
 Branch 0.14 (unreleased changes)
 Branch 0.14 (unreleased changes)
 
 
   1. HADOOP-1197.  In Configuration, deprecate getObject() and add
   1. HADOOP-1197.  In Configuration, deprecate getObject() and add

+ 128 - 43
src/java/org/apache/hadoop/fs/FsShell.java

@@ -35,9 +35,10 @@ public class FsShell extends ToolBase {
     new SimpleDateFormat("yyyy-MM-dd HH:mm");
     new SimpleDateFormat("yyyy-MM-dd HH:mm");
   protected static final SimpleDateFormat modifFmt =
   protected static final SimpleDateFormat modifFmt =
     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-  {
+  static {
     modifFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
     modifFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
   }
+  static final String SETREP_SHORT_USAGE="-setrep [-R] [-w] <rep> <path/file>";
   private static final DecimalFormat decimalFormat = 
   private static final DecimalFormat decimalFormat = 
     new DecimalFormat("#*0.0#*");
     new DecimalFormat("#*0.0#*");
 
 
@@ -225,7 +226,8 @@ public class FsShell extends ToolBase {
           ChecksumFileSystem csfs = (ChecksumFileSystem) srcFS;
           ChecksumFileSystem csfs = (ChecksumFileSystem) srcFS;
           File dstcs = FileSystem.getLocal(srcFS.getConf())
           File dstcs = FileSystem.getLocal(srcFS.getConf())
             .pathToFile(csfs.getChecksumFile(new Path(dst.getCanonicalPath())));
             .pathToFile(csfs.getChecksumFile(new Path(dst.getCanonicalPath())));
-          copyToLocal(srcFS, csfs.getChecksumFile(src), dstcs, false);
+          copyToLocal(csfs.getRawFileSystem(), csfs.getChecksumFile(src),
+                      dstcs, false);
         }
         }
       } else {
       } else {
         throw new IOException("When copying multiple files, "
         throw new IOException("When copying multiple files, "
@@ -305,6 +307,42 @@ public class FsShell extends ToolBase {
     }.process(srcf);
     }.process(srcf);
   }
   }
     
     
+  /**
+   * Parse the args of a command and check the format of args.
+   */
+  static class CommandFormat {
+    final String name;
+    final int minPar, maxPar;
+    final Map<String, Boolean> options = new HashMap<String, Boolean>();
+
+    private CommandFormat(String n, int min, int max, String ... possibleOpt) {
+      name = n;
+      minPar = min;
+      maxPar = max;
+      for(String opt : possibleOpt)
+        options.put(opt, Boolean.FALSE);
+    }
+
+    List<String> parse(String[] args, int pos) {
+      List<String> parameters = new ArrayList<String>();
+      for(; pos < args.length; pos++) {
+        if (args[pos].charAt(0) == '-') {
+          String opt = args[pos].substring(1);
+          if (options.containsKey(opt))
+            options.put(opt, Boolean.TRUE);
+          else
+            throw new IllegalArgumentException("Illegal option " + args[pos]);
+        }
+        else
+          parameters.add(args[pos]);
+      }
+      int psize = parameters.size();
+      if (psize < minPar || psize > maxPar)
+        throw new IllegalArgumentException("Illegal number of arguments");
+      return parameters;
+    }
+  }
+
   /**
   /**
    * Parse the incoming command string
    * Parse the incoming command string
    * @param cmd
    * @param cmd
@@ -312,31 +350,72 @@ public class FsShell extends ToolBase {
    * @throws IOException 
    * @throws IOException 
    */
    */
   private void setReplication(String[] cmd, int pos) throws IOException {
   private void setReplication(String[] cmd, int pos) throws IOException {
-    if (cmd.length-pos<2 || (cmd.length-pos==2 && cmd[pos].equalsIgnoreCase("-R"))) {
-      System.err.println("Usage: [-R] <repvalue> <path>");
-      throw new RuntimeException("Usage: [-R] <repvalue> <path>");
+    CommandFormat c = new CommandFormat("setrep", 2, 2, "R", "w");
+    String dst = null;
+    short rep = 0;
+
+    try {
+      List<String> parameters = c.parse(cmd, pos);
+      rep = Short.parseShort(parameters.get(0));
+      dst = parameters.get(1);
+    } catch (NumberFormatException nfe) {
+      System.err.println("Illegal replication, a positive integer expected");
+      throw nfe;
     }
     }
-      
-    boolean recursive = false;
-    short rep = 3;
-      
-    if ("-R".equalsIgnoreCase(cmd[pos])) {
-      recursive=true;
-      pos++;
-        
+    catch(IllegalArgumentException iae) {
+      System.err.println("Usage: java FsShell " + SETREP_SHORT_USAGE);
+      throw iae;
     }
     }
-      
-    try {
-      rep = Short.parseShort(cmd[pos]);
-      pos++;
-    } catch (NumberFormatException e) {
-      System.err.println("Cannot set replication to: " + cmd[pos]);
-      throw new RuntimeException("Cannot set replication to: " + cmd[pos]);
+
+    if (rep < 1) {
+      System.err.println("Cannot set replication to: " + rep);
+      throw new IllegalArgumentException("replication must be >= 1");
+    }
+
+    List<Path> waitList = c.options.get("w")? new ArrayList<Path>(): null;
+    setReplication(rep, dst, c.options.get("R"), waitList);
+
+    if (waitList != null) {
+      waitForReplication(waitList, rep);
     }
     }
-      
-    setReplication(rep, cmd[pos], recursive);
   }
   }
     
     
+  /**
+   * Wait for all files in waitList to have replication number equal to rep.
+   * @param waitList The files are waited for.
+   * @param rep The new replication number.
+   * @throws IOException IOException
+   */
+  void waitForReplication(List<Path> waitList, int rep) throws IOException {
+    for(Path f : waitList) {
+      System.out.print("Waiting for " + f + " ...");
+      System.out.flush();
+
+      boolean printWarning = false;
+      long len = fs.getFileStatus(f).getLen();
+
+      for(boolean done = false; !done; ) {
+        String[][] locations = fs.getFileCacheHints(f, 0, len);
+        int i = 0;
+        for(; i < locations.length && locations[i].length == rep; i++)
+          if (!printWarning && locations[i].length > rep) {
+            System.out.println("\nWARNING: the waiting time may be long for "
+                + "DECREASING the number of replication.");
+            printWarning = true;
+          }
+        done = i == locations.length;
+
+        if (!done) {
+          System.out.print(".");
+          System.out.flush();
+          try {Thread.sleep(10000);} catch (InterruptedException e) {}
+        }
+      }
+
+      System.out.println(" done");
+    }
+  }
+
   /**
   /**
    * Set the replication for files that match file pattern <i>srcf</i>
    * Set the replication for files that match file pattern <i>srcf</i>
    * if it's a directory and recursive is true,
    * if it's a directory and recursive is true,
@@ -347,22 +426,22 @@ public class FsShell extends ToolBase {
    * @throws IOException  
    * @throws IOException  
    * @see org.apache.hadoop.fs.FileSystem#globPaths(Path)
    * @see org.apache.hadoop.fs.FileSystem#globPaths(Path)
    */
    */
-  void setReplication(short newRep, String srcf, boolean recursive)
+  void setReplication(short newRep, String srcf, boolean recursive,
+                      List<Path> waitingList)
     throws IOException {
     throws IOException {
     Path[] srcs = fs.globPaths(new Path(srcf));
     Path[] srcs = fs.globPaths(new Path(srcf));
     for(int i=0; i<srcs.length; i++) {
     for(int i=0; i<srcs.length; i++) {
-      setReplication(newRep, srcs[i], recursive);
+      setReplication(newRep, srcs[i], recursive, waitingList);
     }
     }
   }
   }
-    
-  private void setReplication(short newRep, Path src, boolean recursive)
+
+  private void setReplication(short newRep, Path src, boolean recursive,
+                              List<Path> waitingList)
     throws IOException {
     throws IOException {
-  	
-    if (!fs.isDirectory(src)) {
-      setFileReplication(src, newRep);
+    if (!fs.getFileStatus(src).isDir()) {
+      setFileReplication(src, newRep, waitingList);
       return;
       return;
     }
     }
-    	
     Path items[] = fs.listPaths(src);
     Path items[] = fs.listPaths(src);
     if (items == null) {
     if (items == null) {
       throw new IOException("Could not get listing for " + src);
       throw new IOException("Could not get listing for " + src);
@@ -370,10 +449,10 @@ public class FsShell extends ToolBase {
 
 
       for (int i = 0; i < items.length; i++) {
       for (int i = 0; i < items.length; i++) {
         Path cur = items[i];
         Path cur = items[i];
-        if (!fs.isDirectory(cur)) {
-          setFileReplication(cur, newRep);
+        if (!fs.getFileStatus(cur).isDir()) {
+          setFileReplication(cur, newRep, waitingList);
         } else if (recursive) {
         } else if (recursive) {
-          setReplication(newRep, cur, recursive);
+          setReplication(newRep, cur, recursive, waitingList);
         }
         }
       }
       }
     }
     }
@@ -386,9 +465,12 @@ public class FsShell extends ToolBase {
    * @param newRep: new replication factor
    * @param newRep: new replication factor
    * @throws IOException
    * @throws IOException
    */
    */
-  private void setFileReplication(Path file, short newRep) throws IOException {
-    	
+  private void setFileReplication(Path file, short newRep, List<Path> waitList)
+    throws IOException {
     if (fs.setReplication(file, newRep)) {
     if (fs.setReplication(file, newRep)) {
+      if (waitList != null) {
+        waitList.add(file);
+      }
       System.out.println("Replication " + newRep + " set: " + file);
       System.out.println("Replication " + newRep + " set: " + file);
     } else {
     } else {
       System.err.println("Could not set replication for: " + file);
       System.err.println("Could not set replication for: " + file);
@@ -849,8 +931,8 @@ public class FsShell extends ToolBase {
       "[-moveFromLocal <localsrc> <dst>] [-get <src> <localdst>]\n\t" +
       "[-moveFromLocal <localsrc> <dst>] [-get <src> <localdst>]\n\t" +
       "[-getmerge <src> <localdst> [addnl]] [-cat <src>]\n\t" +
       "[-getmerge <src> <localdst> [addnl]] [-cat <src>]\n\t" +
       "[-copyToLocal <src><localdst>] [-moveToLocal <src> <localdst>]\n\t" +
       "[-copyToLocal <src><localdst>] [-moveToLocal <src> <localdst>]\n\t" +
-      "[-mkdir <path>] [-report] [-setrep [-R] <rep> <path/file>]\n" +
-      "[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n" +
+      "[-mkdir <path>] [-report] [" + SETREP_SHORT_USAGE + "]\n\t" +
+      "[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" +
       "[-help [cmd]]\n";
       "[-help [cmd]]\n";
 
 
     String conf ="-conf <configuration file>:  Specify an application configuration file.";
     String conf ="-conf <configuration file>:  Specify an application configuration file.";
@@ -935,9 +1017,10 @@ public class FsShell extends ToolBase {
         
         
     String mkdir = "-mkdir <path>: \tCreate a directory in specified location. \n";
     String mkdir = "-mkdir <path>: \tCreate a directory in specified location. \n";
 
 
-    String setrep = "-setrep [-R] <rep> <path/file>:  Set the replication level of a file. \n" +
-      "\t\tThe -R flag requests a recursive change of replication level \n" + 
-      "\t\tfor an entire tree.\n"; 
+    String setrep = SETREP_SHORT_USAGE
+      + ":  Set the replication level of a file. \n"
+      + "\t\tThe -R flag requests a recursive change of replication level \n"
+      + "\t\tfor an entire tree.\n";
 
 
     String touchz = "-touchz <path>: Write a timestamp in yyyy-MM-dd HH:mm:ss format\n" +
     String touchz = "-touchz <path>: Write a timestamp in yyyy-MM-dd HH:mm:ss format\n" +
       "\t\tin a file at <path>. An error is returned if the file exists with non-zero length\n";
       "\t\tin a file at <path>. An error is returned if the file exists with non-zero length\n";
@@ -1131,8 +1214,7 @@ public class FsShell extends ToolBase {
       System.err.println("Usage: java FsShell" + 
       System.err.println("Usage: java FsShell" + 
                          " [" + cmd + " <src> <localdst> [addnl]]");
                          " [" + cmd + " <src> <localdst> [addnl]]");
     } else if ("-setrep".equals(cmd)) {
     } else if ("-setrep".equals(cmd)) {
-      System.err.println("Usage: java FsShell" + 
-                         " [-setrep [-R] <rep> <path/file>]");
+      System.err.println("Usage: java FsShell [" + SETREP_SHORT_USAGE + "]");
     } else if ("-test".equals(cmd)) {
     } else if ("-test".equals(cmd)) {
       System.err.println("Usage: java FsShell" +
       System.err.println("Usage: java FsShell" +
                          " [-test -[ezd] <path>]");
                          " [-test -[ezd] <path>]");
@@ -1162,7 +1244,7 @@ public class FsShell extends ToolBase {
       System.err.println("           [-copyToLocal [-crc] <src> <localdst>]");
       System.err.println("           [-copyToLocal [-crc] <src> <localdst>]");
       System.err.println("           [-moveToLocal [-crc] <src> <localdst>]");
       System.err.println("           [-moveToLocal [-crc] <src> <localdst>]");
       System.err.println("           [-mkdir <path>]");
       System.err.println("           [-mkdir <path>]");
-      System.err.println("           [-setrep [-R] <rep> <path/file>]");
+      System.err.println("           [" + SETREP_SHORT_USAGE + "]");
       System.err.println("           [-touchz <path>]");
       System.err.println("           [-touchz <path>]");
       System.err.println("           [-test -[ezd] <path>]");
       System.err.println("           [-test -[ezd] <path>]");
       System.err.println("           [-stat [format] <path>]");
       System.err.println("           [-stat [format] <path>]");
@@ -1322,6 +1404,9 @@ public class FsShell extends ToolBase {
       exitCode = -1;
       exitCode = -1;
       System.err.println(cmd.substring(1) + ": " + 
       System.err.println(cmd.substring(1) + ": " + 
                          e.getLocalizedMessage());  
                          e.getLocalizedMessage());  
+    } catch (RuntimeException re) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " + re.getLocalizedMessage());  
     } finally {
     } finally {
     }
     }
     return exitCode;
     return exitCode;

+ 7 - 5
src/test/org/apache/hadoop/dfs/TestDFSShell.java

@@ -30,11 +30,11 @@ import org.apache.hadoop.util.StringUtils;
  * This class tests commands from DFSShell.
  * This class tests commands from DFSShell.
  */
  */
 public class TestDFSShell extends TestCase {
 public class TestDFSShell extends TestCase {
-  private static String TEST_ROOT_DIR =
+  static final String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
     new Path(System.getProperty("test.build.data","/tmp"))
     .toString().replace(' ', '+');
     .toString().replace(' ', '+');
 
 
-  static private Path writeFile(FileSystem fs, Path f) throws IOException {
+  static Path writeFile(FileSystem fs, Path f) throws IOException {
     DataOutputStream out = fs.create(f);
     DataOutputStream out = fs.create(f);
     out.writeBytes("dhruba: " + f);
     out.writeBytes("dhruba: " + f);
     out.close();
     out.close();
@@ -42,14 +42,14 @@ public class TestDFSShell extends TestCase {
     return f;
     return f;
   }
   }
 
 
-  static private Path mkdir(FileSystem fs, Path p) throws IOException {
+  static Path mkdir(FileSystem fs, Path p) throws IOException {
     assertTrue(fs.mkdirs(p));
     assertTrue(fs.mkdirs(p));
     assertTrue(fs.exists(p));
     assertTrue(fs.exists(p));
     assertTrue(fs.getFileStatus(p).isDir());
     assertTrue(fs.getFileStatus(p).isDir());
     return p;
     return p;
   }
   }
 
 
-  static private File createLocalFile(File f) throws IOException {
+  static File createLocalFile(File f) throws IOException {
     assertTrue(!f.exists());
     assertTrue(!f.exists());
     PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(f)));
     PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(f)));
     out.println(f.getAbsolutePath());
     out.println(f.getAbsolutePath());
@@ -137,6 +137,8 @@ public class TestDFSShell extends TestCase {
       };
       };
       
       
       //use SecurityManager to pause the copying of f1 and begin copying f2
       //use SecurityManager to pause the copying of f1 and begin copying f2
+      SecurityManager sm = System.getSecurityManager();
+      System.out.println("SecurityManager = " + sm);
       System.setSecurityManager(new SecurityManager() {
       System.setSecurityManager(new SecurityManager() {
         private boolean firstTime = true;
         private boolean firstTime = true;
   
   
@@ -161,7 +163,7 @@ public class TestDFSShell extends TestCase {
       show("done");
       show("done");
   
   
       try {copy2ndFileThread.join();} catch (InterruptedException e) { }
       try {copy2ndFileThread.join();} catch (InterruptedException e) { }
-      System.setSecurityManager(null);
+      System.setSecurityManager(sm);
       f1.delete();
       f1.delete();
       f2.delete();
       f2.delete();
     } finally {
     } finally {

+ 28 - 0
src/test/org/apache/hadoop/dfs/TestSetrepDecreasing.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+public class TestSetrepDecreasing extends TestCase {
+  public void testSetrepDecreasing() throws IOException {
+    TestSetrepIncreasing.setrep(5, 3);
+  }
+}

+ 68 - 0
src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+
+public class TestSetrepIncreasing extends TestCase {
+  static void setrep(int fromREP, int toREP) throws IOException {
+    Configuration conf = new Configuration();
+    conf.set("dfs.replication", "" + fromREP);
+    conf.setLong("dfs.blockreport.intervalMsec", 1000L);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 10, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(), fs instanceof DistributedFileSystem);
+
+    try {
+      Path root = TestDFSShell.mkdir(fs, 
+          new Path("/test/setrep" + fromREP + "-" + toREP));
+      Path f = TestDFSShell.writeFile(fs, new Path(root, "foo"));
+      
+      // Verify setrep for changing replication
+      {
+        String[] args = {"-setrep", "-w", "" + toREP, "" + f};
+        FsShell shell = new FsShell();
+        shell.setConf(conf);
+        try {
+          assertEquals(0, shell.run(args));
+        } catch (Exception e) {
+          assertTrue("-setrep " + e, false);
+        }
+      }
+
+      //get fs again since the old one may be closed
+      fs = cluster.getFileSystem();
+      long len = fs.getFileStatus(f).getLen();
+      for(String[] locations : fs.getFileCacheHints(f, 0, len)) {
+        assertTrue(locations.length == toREP);
+      }
+      TestDFSShell.show("done setrep waiting: " + root);
+    } finally {
+      try {fs.close();} catch (Exception e) {}
+      cluster.shutdown();
+    }
+  }
+
+  public void testSetrepIncreasing() throws IOException {
+    setrep(3, 7);
+  }
+}