Browse Source

HADOOP-5227. Fix distcp so -update and -delete can be meaningfully combined. (Tsz Wo (Nicholas), SZE via cdouglas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@761830 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 16 years ago
parent
commit
0a2101fc9d

+ 3 - 0
CHANGES.txt

@@ -840,6 +840,9 @@ Release 0.20.0 - Unreleased
     join back before scheduling new tasks. This fixes race conditions associated
     join back before scheduling new tasks. This fixes race conditions associated
     with greedy scheduling as was the case earlier. (Amar Kamat via ddas) 
     with greedy scheduling as was the case earlier. (Amar Kamat via ddas) 
 
 
+    HADOOP-5227. Fix distcp so -update and -delete can be meaningfully
+    combined. (Tsz Wo (Nicholas), SZE via cdouglas)
+
 Release 0.19.2 - Unreleased
 Release 0.19.2 - Unreleased
 
 
   BUG FIXES
   BUG FIXES

+ 35 - 18
src/test/org/apache/hadoop/fs/TestCopyFiles.java

@@ -32,19 +32,17 @@ import junit.framework.TestCase;
 
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
 
 
 
 
 /**
 /**
@@ -62,7 +60,7 @@ public class TestCopyFiles extends TestCase {
   static final URI LOCAL_FS = URI.create("file:///");
   static final URI LOCAL_FS = URI.create("file:///");
   
   
   private static final Random RAN = new Random();
   private static final Random RAN = new Random();
-  private static final int NFILES = 20;
+  private static final int NFILES = 7;
   private static String TEST_ROOT_DIR =
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
     new Path(System.getProperty("test.build.data","/tmp"))
     .toString().replace(' ', '+');
     .toString().replace(' ', '+');
@@ -573,7 +571,7 @@ public class TestCopyFiles extends TestCase {
       for (MyFile f : files) {
       for (MyFile f : files) {
         totsize += f.getSize();
         totsize += f.getSize();
       }
       }
-      JobConf job = mr.createJobConf();
+      Configuration job = mr.createJobConf();
       job.setLong("distcp.bytes.per.map", totsize / 3);
       job.setLong("distcp.bytes.per.map", totsize / 3);
       ToolRunner.run(new DistCp(job),
       ToolRunner.run(new DistCp(job),
           new String[] {"-m", "100",
           new String[] {"-m", "100",
@@ -611,6 +609,7 @@ public class TestCopyFiles extends TestCase {
       final String nnUri = FileSystem.getDefaultUri(conf).toString();
       final String nnUri = FileSystem.getDefaultUri(conf).toString();
       final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
       final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
       final DistCp distcp = new DistCp(conf);
       final DistCp distcp = new DistCp(conf);
+      final FsShell shell = new FsShell(conf);  
 
 
       final String srcrootdir =  "/src_root";
       final String srcrootdir =  "/src_root";
       final Path srcrootpath = new Path(srcrootdir); 
       final Path srcrootpath = new Path(srcrootdir); 
@@ -624,7 +623,10 @@ public class TestCopyFiles extends TestCase {
 
 
         ToolRunner.run(distcp,
         ToolRunner.run(distcp,
             new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir});
             new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir});
-        
+        String results = execCmd(shell, "-lsr", dstrootdir);
+        results = removePrefix(results, dstrootdir);
+        System.out.println("results=" +  results);
+
         FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true);
         FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true);
         assertEquals(filelimit, dststat.length);
         assertEquals(filelimit, dststat.length);
         deldir(fs, dstrootdir);
         deldir(fs, dstrootdir);
@@ -745,6 +747,7 @@ public class TestCopyFiles extends TestCase {
     }
     }
   }
   }
 
 
+  /** test -delete */
   public void testDelete() throws Exception {
   public void testDelete() throws Exception {
     final Configuration conf = new Configuration();
     final Configuration conf = new Configuration();
     MiniDFSCluster cluster = null;
     MiniDFSCluster cluster = null;
@@ -760,31 +763,45 @@ public class TestCopyFiles extends TestCase {
       final String srcrootdir = "/src_root";
       final String srcrootdir = "/src_root";
       final String dstrootdir = "/dst_root";
       final String dstrootdir = "/dst_root";
 
 
-      {//test -delete
+      {
+        //create source files
         createFiles(nnURI, srcrootdir);
         createFiles(nnURI, srcrootdir);
-        createFiles(nnURI, dstrootdir);
-        create(fs, new Path(dstrootdir, "foo"));
-        create(fs, new Path(dstrootdir, "foobar"));
-        
-        System.out.println("srcrootdir=" +  srcrootdir);
-        shell.run(new String[]{"-lsr", srcrootdir});
+        String srcresults = execCmd(shell, "-lsr", srcrootdir);
+        srcresults = removePrefix(srcresults, srcrootdir);
+        System.out.println("srcresults=" +  srcresults);
 
 
+        //create some files in dst
+        createFiles(nnURI, dstrootdir);
         System.out.println("dstrootdir=" +  dstrootdir);
         System.out.println("dstrootdir=" +  dstrootdir);
         shell.run(new String[]{"-lsr", dstrootdir});
         shell.run(new String[]{"-lsr", dstrootdir});
 
 
+        //run distcp
         ToolRunner.run(distcp,
         ToolRunner.run(distcp,
             new String[]{"-delete", "-update", "-log", "/log",
             new String[]{"-delete", "-update", "-log", "/log",
                          nnUri+srcrootdir, nnUri+dstrootdir});
                          nnUri+srcrootdir, nnUri+dstrootdir});
 
 
-        String srcresults = execCmd(shell, "-lsr", srcrootdir);
-        srcresults = removePrefix(srcresults, srcrootdir);
-        System.out.println("srcresults=" +  srcresults);
-
+        //make sure src and dst contains the same files
         String dstresults = execCmd(shell, "-lsr", dstrootdir);
         String dstresults = execCmd(shell, "-lsr", dstrootdir);
         dstresults = removePrefix(dstresults, dstrootdir);
         dstresults = removePrefix(dstresults, dstrootdir);
-        System.out.println("dstresults=" +  dstresults);
+        System.out.println("first dstresults=" +  dstresults);
+        assertEquals(srcresults, dstresults);
+
+        //create additional file in dst
+        create(fs, new Path(dstrootdir, "foo"));
+        create(fs, new Path(dstrootdir, "foobar"));
+
+        //run distcp again
+        ToolRunner.run(distcp,
+            new String[]{"-delete", "-update", "-log", "/log2",
+                         nnUri+srcrootdir, nnUri+dstrootdir});
         
         
+        //make sure src and dst contains the same files
+        dstresults = execCmd(shell, "-lsr", dstrootdir);
+        dstresults = removePrefix(dstresults, dstrootdir);
+        System.out.println("second dstresults=" +  dstresults);
         assertEquals(srcresults, dstresults);
         assertEquals(srcresults, dstresults);
+
+        //cleanup
         deldir(fs, dstrootdir);
         deldir(fs, dstrootdir);
         deldir(fs, srcrootdir);
         deldir(fs, srcrootdir);
       }
       }

+ 23 - 26
src/tools/org/apache/hadoop/tools/DistCp.java

@@ -1039,10 +1039,8 @@ public class DistCp implements Tool {
       (args.srcs.size() == 1 && !dstExists) || update || overwrite;
       (args.srcs.size() == 1 && !dstExists) || update || overwrite;
     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
     long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
     long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
-    boolean exceededlimit = false;
     try {
     try {
-      for(Iterator<Path> srcItr = args.srcs.iterator();
-          !exceededlimit && srcItr.hasNext(); ) {
+      for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
         final Path src = srcItr.next();
         final Path src = srcItr.next();
         FileSystem srcfs = src.getFileSystem(conf);
         FileSystem srcfs = src.getFileSystem(conf);
         FileStatus srcfilestat = srcfs.getFileStatus(src);
         FileStatus srcfilestat = srcfs.getFileStatus(src);
@@ -1052,10 +1050,10 @@ public class DistCp implements Tool {
         }
         }
 
 
         Stack<FileStatus> pathstack = new Stack<FileStatus>();
         Stack<FileStatus> pathstack = new Stack<FileStatus>();
-        for(pathstack.push(srcfilestat); !exceededlimit && !pathstack.empty(); ) {
+        for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
           FileStatus cur = pathstack.pop();
           FileStatus cur = pathstack.pop();
           FileStatus[] children = srcfs.listStatus(cur.getPath());
           FileStatus[] children = srcfs.listStatus(cur.getPath());
-          for(int i = 0; !exceededlimit && i < children.length; i++) {
+          for(int i = 0; i < children.length; i++) {
             boolean skipfile = false;
             boolean skipfile = false;
             final FileStatus child = children[i]; 
             final FileStatus child = children[i]; 
             final String dst = makeRelative(root, child.getPath());
             final String dst = makeRelative(root, child.getPath());
@@ -1067,37 +1065,36 @@ public class DistCp implements Tool {
             else {
             else {
               //skip file if the src and the dst files are the same.
               //skip file if the src and the dst files are the same.
               skipfile = update && sameFile(srcfs, child, dstfs, new Path(args.dst, dst));
               skipfile = update && sameFile(srcfs, child, dstfs, new Path(args.dst, dst));
-              
+              //skip file if it exceed file limit or size limit
+              skipfile |= fileCount == args.filelimit
+                          || byteCount + child.getLen() > args.sizelimit; 
+
               if (!skipfile) {
               if (!skipfile) {
                 ++fileCount;
                 ++fileCount;
                 byteCount += child.getLen();
                 byteCount += child.getLen();
-  
-                exceededlimit |= fileCount > args.filelimit
-                                 || byteCount > args.sizelimit;
-
-                if (!exceededlimit) {
-                  if (LOG.isTraceEnabled()) {
-                    LOG.trace("adding file " + child.getPath());
-                  }
-
-                  ++cnsyncf;
-                  cbsyncs += child.getLen();
-                  if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
-                    src_writer.sync();
-                    dst_writer.sync();
-                    cnsyncf = 0;
-                    cbsyncs = 0L;
-                  }
+
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("adding file " + child.getPath());
+                }
+
+                ++cnsyncf;
+                cbsyncs += child.getLen();
+                if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+                  src_writer.sync();
+                  dst_writer.sync();
+                  cnsyncf = 0;
+                  cbsyncs = 0L;
                 }
                 }
               }
               }
             }
             }
 
 
-            if (!skipfile && !exceededlimit) {
+            if (!skipfile) {
               src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
               src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
                   new FilePair(child, dst));
                   new FilePair(child, dst));
-              dst_writer.append(new Text(dst),
-                  new Text(child.getPath().toString()));
             }
             }
+
+            dst_writer.append(new Text(dst),
+                new Text(child.getPath().toString()));
           }
           }
 
 
           if (cur.isDir()) {
           if (cur.isDir()) {