Explorar o código

HADOOP-1569. Fixes DistCP to use the standard FileSystem interface.
Contributed by Chris Douglas.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@573166 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley %!s(int64=18) %!d(string=hai) anos
pai
achega
41cc1fc9e9

+ 3 - 0
CHANGES.txt

@@ -163,6 +163,9 @@ Trunk (unreleased changes)
     HADOOP-1425.  Replace uses of ToolBase with the Tool interface.
     (Enis Soztutar via cutting)
 
+    HADOOP-1569.  Reimplement DistCP to use the standard FileSystem/URI
+    code in Hadoop so that you can copy from and to all of the supported file 
+    systems.(Chris Douglas via omalley)
 
 Release 0.14.1 - 2007-09-04
 

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 300 - 653
src/java/org/apache/hadoop/util/CopyFiles.java


+ 159 - 32
src/test/org/apache/hadoop/fs/TestCopyFiles.java

@@ -50,12 +50,13 @@ public class TestCopyFiles extends TestCase {
     private static String[] dirNames = {
       "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
     };
-    private String name = "";
-    private int size;
-    private long seed;
-    
+    private final String name;
+    private int size = 0;
+    private long seed = 0L;
+
     MyFile() {
       int nLevels = gen.nextInt(MAX_LEVELS);
+      String xname = "";
       if (nLevels != 0) {
         int[] levels = new int[nLevels];
         for (int idx = 0; idx < nLevels; idx++) {
@@ -66,20 +67,23 @@ public class TestCopyFiles extends TestCase {
           sb.append(dirNames[levels[idx]]);
           sb.append("/");
         }
-        name = sb.toString();
+        xname = sb.toString();
       }
-      long fidx = -1;
-      while (fidx < 0) { fidx = gen.nextLong(); }
-      name = name + Long.toString(fidx);
-      size = gen.nextInt(MAX_SIZE);
-      seed = gen.nextLong();
+      long fidx = gen.nextLong() & Long.MAX_VALUE;
+      name = xname + Long.toString(fidx);
+      reset();
+    }
+    void reset() {
+      final int oldsize = size;
+      do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size);
+      final long oldseed = seed;
+      do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed);
     }
-    
     String getName() { return name; }
     int getSize() { return size; }
     long getSeed() { return seed; }
   }
-  
+
   public TestCopyFiles(String testName) {
     super(testName);
   }
@@ -126,7 +130,7 @@ public class TestCopyFiles extends TestCase {
     
     return files;
   }
-  
+
   /** check if the files have been copied correctly. */
   private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
     throws IOException {
@@ -155,7 +159,67 @@ public class TestCopyFiles extends TestCase {
     
     return true;
   }
-  
+
+  private static void updateFiles(String fsname, String topdir, MyFile[] files,
+        int nupdate) throws IOException {
+    assert nupdate <= NFILES;
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+
+    for (int idx = 0; idx < nupdate; ++idx) {
+      Path fPath = new Path(root, files[idx].getName());
+      // overwrite file
+      assertTrue(fPath.toString() + " does not exist", fs.exists(fPath));
+      FSDataOutputStream out = fs.create(fPath);
+      files[idx].reset();
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+    }
+  }
+
+  private static FileStatus[] getFileStatus(String namenode,
+      String topdir, MyFile[] files) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(namenode, conf);
+    Path root = new Path(topdir);
+    FileStatus[] ret = new FileStatus[NFILES];
+    for (int idx = 0; idx < NFILES; ++idx) {
+      ret[idx] = fs.getFileStatus(new Path(root, files[idx].getName()));
+    }
+    return ret;
+  }
+
+  private static boolean checkUpdate(FileStatus[] old, String namenode,
+      String topdir, MyFile[] upd, final int nupdate) throws IOException {
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(namenode, conf);
+    Path root = new Path(topdir);
+
+    // overwrote updated files
+    for (int idx = 0; idx < nupdate; ++idx) {
+      final FileStatus stat =
+        fs.getFileStatus(new Path(root, upd[idx].getName()));
+      if (stat.getModificationTime() <= old[idx].getModificationTime()) {
+        return false;
+      }
+    }
+    // did not overwrite files not updated
+    for (int idx = nupdate; idx < NFILES; ++idx) {
+      final FileStatus stat =
+        fs.getFileStatus(new Path(root, upd[idx].getName()));
+      if (stat.getModificationTime() != old[idx].getModificationTime()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /** delete directory and everything underneath it.*/
   private static void deldir(String fsname, String topdir)
     throws IOException {
@@ -169,8 +233,8 @@ public class TestCopyFiles extends TestCase {
   public void testCopyFromLocalToLocal() throws Exception {
     MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
     ToolRunner.run(new CopyFiles(new Configuration()),
-                           new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-                                         "file://"+TEST_ROOT_DIR+"/destdat"});
+                           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
     assertTrue("Source and destination directories do not match.",
                checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
     deldir("local", TEST_ROOT_DIR+"/destdat");
@@ -187,14 +251,15 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {"hdfs://"+namenode+"/srcdat",
-                                                   "hdfs://"+namenode+"/destdat",
-                                                   "-log",
-                                                   "hdfs://"+namenode+"/logs"});
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(namenode, "/destdat", files));
         FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
-        assertTrue("Log directory doesnot exist.",
+        assertTrue("Log directory does not exist.",
                     fs.exists(new Path("hdfs://"+namenode+"/logs")));
         deldir(namenode, "/destdat");
         deldir(namenode, "/srcdat");
@@ -215,14 +280,15 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
-                                                   "hdfs://"+namenode+"/destdat",
-                                                   "-log",
-                                                   "hdfs://"+namenode+"/logs"});
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "file:///"+TEST_ROOT_DIR+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles(namenode, "/destdat", files));
         FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
-        assertTrue("Log directory doesnot exist.",
+        assertTrue("Log directory does not exist.",
                     fs.exists(new Path("hdfs://"+namenode+"/logs")));
         deldir(namenode, "/destdat");
         deldir(namenode, "/logs");
@@ -243,14 +309,15 @@ public class TestCopyFiles extends TestCase {
       namenode = conf.get("fs.default.name", "local");
       if (!"local".equals(namenode)) {
         MyFile[] files = createFiles(namenode, "/srcdat");
-        ToolRunner.run(new CopyFiles(conf), new String[] {"hdfs://"+namenode+"/srcdat",
-                                                   "file://"+TEST_ROOT_DIR+"/destdat",
-                                                   "-log",
-                                                   "/logs"});
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-log",
+                                         "/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
         assertTrue("Source and destination directories do not match.",
                    checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
         FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
-        assertTrue("Log directory doesnot exist.",
+        assertTrue("Log directory does not exist.",
                     fs.exists(new Path("/logs")));
         deldir("local", TEST_ROOT_DIR+"/destdat");
         deldir(namenode, "/logs");
@@ -260,5 +327,65 @@ public class TestCopyFiles extends TestCase {
       if (cluster != null) { cluster.shutdown(); }
     }
   }
-  
+
+  public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      namenode = conf.get("fs.default.name", "local");
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles(namenode, "/srcdat");
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-p",
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(namenode, "/destdat", files));
+        FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
+        assertTrue("Log directory does not exist.",
+                    fs.exists(new Path("hdfs://"+namenode+"/logs")));
+
+        FileStatus[] dchkpoint = getFileStatus(namenode, "/destdat", files);
+        final int nupdate = NFILES>>2;
+        updateFiles(namenode, "/srcdat", files, nupdate);
+        deldir(namenode, "/logs");
+
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-p",
+                                         "-update",
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(namenode, "/destdat", files));
+        assertTrue("Update failed to replicate all changes in src",
+                 checkUpdate(dchkpoint, namenode, "/destdat", files, nupdate));
+
+        deldir(namenode, "/logs");
+        ToolRunner.run(new CopyFiles(conf), new String[] {
+                                         "-p",
+                                         "-overwrite",
+                                         "-log",
+                                         "hdfs://"+namenode+"/logs",
+                                         "hdfs://"+namenode+"/srcdat",
+                                         "hdfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(namenode, "/destdat", files));
+        assertTrue("-overwrite didn't.",
+                 checkUpdate(dchkpoint, namenode, "/destdat", files, NFILES));
+
+        deldir(namenode, "/destdat");
+        deldir(namenode, "/srcdat");
+        deldir(namenode, "/logs");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
 }

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio