Pārlūkot izejas kodu

HADOOP-2725. Roll back r620055 due to a failing test case.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@620071 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas 17 gadi atpakaļ
vecāks
revīzija
4bd619c715

+ 0 - 3
CHANGES.txt

@@ -45,9 +45,6 @@ Release 0.16.1 - Unrelease
     dfs.umask to set the umask that is used by DFS.
     dfs.umask to set the umask that is used by DFS.
     (Tsz Wo (Nicholas), SZE via dhruba)
     (Tsz Wo (Nicholas), SZE via dhruba)
 
 
-    HADOOP-2725. Fix distcp to avoid leaving truncated files, particularly
-    in the default case. (Tsz Wo (Nicholas), SZE via cdouglas)
-
 Release 0.16.0 - 2008-02-07
 Release 0.16.0 - 2008-02-07
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 2 - 2
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -757,7 +757,7 @@ public class SequenceFile {
   }
   }
   
   
   /** Write key/value pairs to a sequence-format file. */
   /** Write key/value pairs to a sequence-format file. */
-  public static class Writer implements java.io.Closeable {
+  public static class Writer {
     /**
     /**
      * A global compressor pool used to save the expensive 
      * A global compressor pool used to save the expensive 
      * construction/destruction of (possibly native) compression codecs.
      * construction/destruction of (possibly native) compression codecs.
@@ -1316,7 +1316,7 @@ public class SequenceFile {
   } // BlockCompressionWriter
   } // BlockCompressionWriter
   
   
   /** Reads key/value pairs from a sequence-format file. */
   /** Reads key/value pairs from a sequence-format file. */
-  public static class Reader implements java.io.Closeable {
+  public static class Reader {
     /**
     /**
      * A global decompressor pool used to save the expensive 
      * A global decompressor pool used to save the expensive 
      * construction/destruction of (possibly native) decompression codecs.
      * construction/destruction of (possibly native) decompression codecs.

+ 185 - 274
src/java/org/apache/hadoop/util/CopyFiles.java

@@ -25,7 +25,17 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.InputStreamReader;
 import java.text.DecimalFormat;
 import java.text.DecimalFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Stack;
+import java.util.StringTokenizer;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -60,10 +70,8 @@ import org.apache.hadoop.mapred.SequenceFileRecordReader;
 public class CopyFiles implements Tool {
 public class CopyFiles implements Tool {
   private static final Log LOG = LogFactory.getLog(CopyFiles.class);
   private static final Log LOG = LogFactory.getLog(CopyFiles.class);
 
 
-  private static final String NAME = "distcp";
-
-  private static final String usage = NAME
-    + " [OPTIONS] <srcurl>* <desturl>" +
+  private static final String usage =
+    "distcp [OPTIONS] <srcurl>* <desturl>" +
     "\n\nOPTIONS:" +
     "\n\nOPTIONS:" +
     "\n-p                     Preserve status" +
     "\n-p                     Preserve status" +
     "\n-i                     Ignore failures" +
     "\n-i                     Ignore failures" +
@@ -74,7 +82,7 @@ public class CopyFiles implements Tool {
     "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
     "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
     "\n      interpreted as an isomorphic update to an existing directory." +
     "\n      interpreted as an isomorphic update to an existing directory." +
     "\nFor example:" +
     "\nFor example:" +
-    "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
+    "\nhadoop distcp -p -update \"hdfs://A:8020/user/foo/bar\" " +
     "\"hdfs://B:8020/user/foo/baz\"\n" +
     "\"hdfs://B:8020/user/foo/baz\"\n" +
     "\n     would update all descendants of 'baz' also in 'bar'; it would " +
     "\n     would update all descendants of 'baz' also in 'bar'; it would " +
     "\n     *not* update /user/foo/baz/bar\n";
     "\n     *not* update /user/foo/baz/bar\n";
@@ -84,10 +92,6 @@ public class CopyFiles implements Tool {
   private static final int SYNC_FILE_MAX = 10;
   private static final int SYNC_FILE_MAX = 10;
 
 
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
-  static enum Options {IGNORE_READ_FAILURES, PRESERVE_STATUS, OVERWRITE, UPDATE}
-
-  static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
-  static final String DST_DIR_LABEL = NAME + ".dest.path";
 
 
   private JobConf conf;
   private JobConf conf;
 
 
@@ -103,6 +107,9 @@ public class CopyFiles implements Tool {
     return conf;
     return conf;
   }
   }
 
 
+  @Deprecated
+  public CopyFiles() { }
+
   public CopyFiles(Configuration conf) {
   public CopyFiles(Configuration conf) {
     setConf(conf);
     setConf(conf);
   }
   }
@@ -132,7 +139,7 @@ public class CopyFiles implements Tool {
    * InputFormat of a distcp job responsible for generating splits of the src
    * InputFormat of a distcp job responsible for generating splits of the src
    * file list.
    * file list.
    */
    */
-  static class CopyInputFormat implements InputFormat<Text, Text> {
+  static class CopyInputFormat implements InputFormat {
 
 
     /**
     /**
      * Does nothing.
      * Does nothing.
@@ -190,12 +197,22 @@ public class CopyFiles implements Tool {
     /**
     /**
      * Returns a reader for this split of the src file list.
      * Returns a reader for this split of the src file list.
      */
      */
-    public RecordReader<Text, Text> getRecordReader(InputSplit split,
-        JobConf job, Reporter reporter) throws IOException {
-      return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
+    public RecordReader getRecordReader(InputSplit split, JobConf job,
+                                 Reporter reporter) throws IOException {
+      return new SequenceFileRecordReader(job, (FileSplit)split);
     }
     }
   }
   }
 
 
+  /**
+   * Return true if dst should be replaced by src and the update flag is set.
+   * Right now, this merely checks that the src and dst len are not equal. This
+   * should be improved on once modification times, CRCs, etc. can
+   * be meaningful in this context.
+   */
+  private static boolean needsUpdate(FileStatus src, FileStatus dst) {
+    return src.getLen() != dst.getLen();
+  }
+
   /**
   /**
    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
    */
    */
@@ -223,15 +240,6 @@ public class CopyFiles implements Tool {
                     " Failed: " + failcount);
                     " Failed: " + failcount);
     }
     }
 
 
-    /**
-     * Return true if dst should be replaced by src and the update flag is set.
-     * Right now, this merely checks that the src and dst len are not equal. 
-     * This should be improved on once modification times, CRCs, etc. can
-     * be meaningful in this context.
-     */
-    private boolean needsUpdate(FileStatus src, FileStatus dst) {
-      return update && src.getLen() != dst.getLen();
-    }
 
 
     /**
     /**
      * Copy a file to a destination.
      * Copy a file to a destination.
@@ -239,22 +247,17 @@ public class CopyFiles implements Tool {
      * @param dstpath dst path
      * @param dstpath dst path
      * @param reporter
      * @param reporter
      */
      */
-    private void copy(FileStatus srcstat, Path relativedst,
+    private void copy(FileStatus srcstat, Path dstpath,
         OutputCollector<WritableComparable, Text> outc, Reporter reporter)
         OutputCollector<WritableComparable, Text> outc, Reporter reporter)
         throws IOException {
         throws IOException {
-      Path absdst = new Path(destPath, relativedst);
+
       int totfiles = job.getInt("distcp.file.count", -1);
       int totfiles = job.getInt("distcp.file.count", -1);
       assert totfiles >= 0 : "Invalid file count " + totfiles;
       assert totfiles >= 0 : "Invalid file count " + totfiles;
 
 
       // if a directory, ensure created even if empty
       // if a directory, ensure created even if empty
       if (srcstat.isDir()) {
       if (srcstat.isDir()) {
-        if (destFileSys.exists(absdst)) {
-          if (!destFileSys.getFileStatus(absdst).isDir()) {
-            throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
-          }
-        }
-        else if (!destFileSys.mkdirs(absdst)) {
-          throw new IOException("Failed to mkdirs " + absdst);
+        if (!destFileSys.mkdirs(dstpath)) {
+          throw new IOException("Failed to create" + dstpath);
         }
         }
         // TODO: when modification times can be set, directories should be
         // TODO: when modification times can be set, directories should be
         // emitted to reducers so they might be preserved. Also, mkdirs does
         // emitted to reducers so they might be preserved. Also, mkdirs does
@@ -262,87 +265,70 @@ public class CopyFiles implements Tool {
         // if this changes, all directory work might as well be done in reduce
         // if this changes, all directory work might as well be done in reduce
         return;
         return;
       }
       }
-
-      if (destFileSys.exists(absdst) && !overwrite
-          && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
-        outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
-        ++skipcount;
-        reporter.incrCounter(Counter.SKIP, 1);
-        updateStatus(reporter);
-        return;
+      Path destParent = dstpath.getParent();
+      if (totfiles > 1) {
+        // create directories to hold destination file
+        if (destParent != null && !destFileSys.mkdirs(destParent)) {
+          throw new IOException("mkdirs failed to create " + destParent);
+        }
+      } else {
+        // Copying a single file; use dst path provided by user as destination
+        // rather than destination directory
+        dstpath = destParent;
       }
       }
 
 
-      Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
       long cbcopied = 0L;
       long cbcopied = 0L;
       FSDataInputStream in = null;
       FSDataInputStream in = null;
       FSDataOutputStream out = null;
       FSDataOutputStream out = null;
       try {
       try {
+        if (destFileSys.exists(dstpath)
+           && (!overwrite && !(update
+               && needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
+          outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
+          ++skipcount;
+          reporter.incrCounter(Counter.SKIP, 1);
+          updateStatus(reporter);
+          return;
+        }
         // open src file
         // open src file
         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
-        reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
-        // open tmp file
+        final long cblen = srcstat.getLen();
+        reporter.incrCounter(Counter.BYTESEXPECTED, cblen);
+        // open dst file
         out = preserve_status
         out = preserve_status
-          ? destFileSys.create(tmpfile, true, sizeBuf, srcstat.getReplication(),
+          ? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
              srcstat.getBlockSize(), reporter)
              srcstat.getBlockSize(), reporter)
-          : destFileSys.create(tmpfile, reporter);
+          : destFileSys.create(dstpath, reporter);
         // copy file
         // copy file
         int cbread;
         int cbread;
         while ((cbread = in.read(buffer)) >= 0) {
         while ((cbread = in.read(buffer)) >= 0) {
           out.write(buffer, 0, cbread);
           out.write(buffer, 0, cbread);
           cbcopied += cbread;
           cbcopied += cbread;
-          reporter.setStatus(pcntfmt.format(100.0 * cbcopied / srcstat.getLen())
-              + " " + absdst + " [ " +
+          reporter.setStatus(pcntfmt.format(100.0 * cbcopied / cblen) +
+              " " + dstpath + " [ " +
               StringUtils.humanReadableInt(cbcopied) + " / " +
               StringUtils.humanReadableInt(cbcopied) + " / " +
-              StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
+              StringUtils.humanReadableInt(cblen) + " ]");
         }
         }
-      } finally {
-        checkAndClose(in);
-        checkAndClose(out);
-      }
-
-      final boolean success = cbcopied == srcstat.getLen();
-      if (!success) {
-        final String badlen = "ERROR? copied " + bytesString(cbcopied)
-            + " but expected " + bytesString(srcstat.getLen()) 
-            + " from " + srcstat.getPath();
-        LOG.warn(badlen);
-        outc.collect(null, new Text(badlen));
-      }
-      else {
-        if (totfiles == 1) {
-          // Copying a single file; use dst path provided by user as destination
-          // rather than destination directory
-          absdst = absdst.getParent();
+        if (cbcopied != cblen) {
+          final String badlen = "ERROR? copied " + cbcopied + " bytes (" +
+              StringUtils.humanReadableInt(cbcopied) + ") expected " +
+              cblen + " bytes (" + StringUtils.humanReadableInt(cblen) +
+              ") from " + srcstat.getPath();
+          LOG.warn(badlen);
+          outc.collect(null, new Text(badlen));
         }
         }
-        rename(destFileSys, tmpfile, absdst);
+      } finally {
+        if (in != null)
+          in.close();
+        if (out != null)
+          out.close();
       }
       }
-
       // report at least once for each file
       // report at least once for each file
       ++copycount;
       ++copycount;
       reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
       reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
       reporter.incrCounter(Counter.COPY, 1);
       reporter.incrCounter(Counter.COPY, 1);
       updateStatus(reporter);
       updateStatus(reporter);
     }
     }
-    
-    /** rename tmp to dst, delete dst if already exists */
-    private void rename(FileSystem fs, Path tmp, Path dst) throws IOException {
-      try {
-        if (fs.exists(dst)) {
-          fs.delete(dst);
-        }
-        fs.rename(tmp, dst);
-      }
-      catch(IOException cause) {
-        IOException ioe = new IOException("Fail to rename tmp file (=" + tmp 
-            + ") to destination file (=" + dst + ")");
-        ioe.initCause(cause);
-        throw ioe;
-      }
-    }
-    
-    static String bytesString(long b) {
-      return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
-    }
 
 
     /** Mapper configuration.
     /** Mapper configuration.
      * Extracts source and destination file system, as well as
      * Extracts source and destination file system, as well as
@@ -351,7 +337,7 @@ public class CopyFiles implements Tool {
      */
      */
     public void configure(JobConf job)
     public void configure(JobConf job)
     {
     {
-      destPath = new Path(job.get(DST_DIR_LABEL, "/"));
+      destPath = new Path(job.get("copy.dest.path", "/"));
       try {
       try {
         destFileSys = destPath.getFileSystem(job);
         destFileSys = destPath.getFileSystem(job);
       } catch (IOException ex) {
       } catch (IOException ex) {
@@ -452,15 +438,27 @@ public class CopyFiles implements Tool {
     } else {
     } else {
       tmp.add(src);
       tmp.add(src);
     }
     }
-    EnumSet<Options> flags = ignoreReadFailures
-      ? EnumSet.of(Options.IGNORE_READ_FAILURES)
-      : EnumSet.noneOf(Options.class);
+    EnumSet<cpOpts> flags = ignoreReadFailures
+      ? EnumSet.of(cpOpts.IGNORE_READ_FAILURES)
+      : EnumSet.noneOf(cpOpts.class);
     copy(conf, tmp, new Path(destPath), logPath, flags);
     copy(conf, tmp, new Path(destPath), logPath, flags);
   }
   }
 
 
-  /** Sanity check for srcPath */
-  private static void checkSrcPath(Configuration conf, List<Path> srcPaths
-      ) throws IOException {
+  /**
+   * Driver to copy srcPath to destPath depending on required protocol.
+   * @param srcPaths list of source paths
+   * @param destPath Destination path
+   * @param logPath Log output directory
+   * @param flags Command-line flags
+   */
+  public static void copy(Configuration conf, List<Path> srcPaths,
+      Path destPath, Path logPath,
+      EnumSet<cpOpts> flags) throws IOException {
+    //Job configuration
+    JobConf job = new JobConf(conf, CopyFiles.class);
+    job.setJobName("distcp");
+
+    //Sanity check for srcPath/destPath
     List<IOException> rslt = new ArrayList<IOException>();
     List<IOException> rslt = new ArrayList<IOException>();
     for (Path p : srcPaths) {
     for (Path p : srcPaths) {
       FileSystem fs = p.getFileSystem(conf);
       FileSystem fs = p.getFileSystem(conf);
@@ -471,33 +469,21 @@ public class CopyFiles implements Tool {
     if (!rslt.isEmpty()) {
     if (!rslt.isEmpty()) {
       throw new InvalidInputException(rslt);
       throw new InvalidInputException(rslt);
     }
     }
-  }
-
-  /**
-   * Driver to copy srcPath to destPath depending on required protocol.
-   * @param srcPaths list of source paths
-   * @param destPath Destination path
-   * @param logPath Log output directory
-   * @param flags Command-line flags
-   */
-  public static void copy(Configuration conf, List<Path> srcPaths,
-      Path destPath, Path logPath,
-      EnumSet<Options> flags) throws IOException {
-    checkSrcPath(conf, srcPaths);
 
 
-    JobConf job = createJobConf(conf);
     //Initialize the mapper
     //Initialize the mapper
     try {
     try {
       setup(conf, job, srcPaths, destPath, logPath, flags);
       setup(conf, job, srcPaths, destPath, logPath, flags);
       JobClient.runJob(job);
       JobClient.runJob(job);
     } finally {
     } finally {
-      //delete tmp
-      fullyDelete(job.get(TMP_DIR_LABEL), job);
-      //delete jobDirectory
-      fullyDelete(job.get("distcp.job.dir"), job);
+      cleanup(conf, job);
     }
     }
   }
   }
 
 
+  enum cpOpts { IGNORE_READ_FAILURES,
+                PRESERVE_STATUS,
+                OVERWRITE,
+                UPDATE }
+
   /**
   /**
    * This is the main driver for recursively copying directories
    * This is the main driver for recursively copying directories
    * across file systems. It takes at least two cmdline parameters. A source
    * across file systems. It takes at least two cmdline parameters. A source
@@ -510,17 +496,17 @@ public class CopyFiles implements Tool {
     List<Path> srcPath = new ArrayList<Path>();
     List<Path> srcPath = new ArrayList<Path>();
     Path destPath = null;
     Path destPath = null;
     Path logPath = null;
     Path logPath = null;
-    EnumSet<Options> flags = EnumSet.noneOf(Options.class);
+    EnumSet<cpOpts> flags = EnumSet.noneOf(cpOpts.class);
 
 
     for (int idx = 0; idx < args.length; idx++) {
     for (int idx = 0; idx < args.length; idx++) {
       if ("-i".equals(args[idx])) {
       if ("-i".equals(args[idx])) {
-        flags.add(Options.IGNORE_READ_FAILURES);
+        flags.add(cpOpts.IGNORE_READ_FAILURES);
       } else if ("-p".equals(args[idx])) {
       } else if ("-p".equals(args[idx])) {
-        flags.add(Options.PRESERVE_STATUS);
+        flags.add(cpOpts.PRESERVE_STATUS);
       } else if ("-overwrite".equals(args[idx])) {
       } else if ("-overwrite".equals(args[idx])) {
-        flags.add(Options.OVERWRITE);
+        flags.add(cpOpts.OVERWRITE);
       } else if ("-update".equals(args[idx])) {
       } else if ("-update".equals(args[idx])) {
-        flags.add(Options.UPDATE);
+        flags.add(cpOpts.UPDATE);
       } else if ("-f".equals(args[idx])) {
       } else if ("-f".equals(args[idx])) {
         if (++idx ==  args.length) {
         if (++idx ==  args.length) {
           System.out.println("urilist_uri not specified");
           System.out.println("urilist_uri not specified");
@@ -555,16 +541,13 @@ public class CopyFiles implements Tool {
       return -1;
       return -1;
     }
     }
     // incompatible command-line flags
     // incompatible command-line flags
-    if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
+    if (flags.contains(cpOpts.OVERWRITE) && flags.contains(cpOpts.UPDATE)) {
       System.out.println("Conflicting overwrite policies");
       System.out.println("Conflicting overwrite policies");
       System.out.println(usage);
       System.out.println(usage);
       return -1;
       return -1;
     }
     }
     try {
     try {
       copy(conf, srcPath, destPath, logPath, flags);
       copy(conf, srcPath, destPath, logPath, flags);
-    } catch (DuplicationException e) {
-      System.err.println(StringUtils.stringifyException(e));
-      return DuplicationException.ERROR_CODE;
     } catch (Exception e) {
     } catch (Exception e) {
       System.err.println("With failures, global counters are inaccurate; " +
       System.err.println("With failures, global counters are inaccurate; " +
           "consider running with -i");
           "consider running with -i");
@@ -624,50 +607,22 @@ public class CopyFiles implements Tool {
     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
     return Math.max(numMaps, 1);
     return Math.max(numMaps, 1);
   }
   }
-
-  /** Fully delete dir */
-  static void fullyDelete(String dir, Configuration conf) throws IOException {
-    if (dir != null) {
-      Path tmp = new Path(dir);
-      FileUtil.fullyDelete(tmp.getFileSystem(conf), tmp);
+  /**
+   * Delete the temporary dir containing the src file list.
+   * @param conf The dfs/mapred configuration
+   * @param jobConf The handle to the jobConf object
+   */
+  private static void cleanup(Configuration conf, JobConf jobConf)
+      throws IOException {
+    //Clean up jobDirectory
+    String jobDirName = jobConf.get("distdp.job.dir");
+    if (jobDirName != null) {
+      Path jobDirectory = new Path(jobDirName);
+      FileSystem fs = jobDirectory.getFileSystem(jobConf);
+      FileUtil.fullyDelete(fs, jobDirectory);
     }
     }
   }
   }
 
 
-  //Job configuration
-  private static JobConf createJobConf(Configuration conf) {
-    JobConf jobconf = new JobConf(conf, CopyFiles.class);
-    jobconf.setJobName(NAME);
-
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    jobconf.setMapSpeculativeExecution(false);
-
-    jobconf.setInputFormat(CopyInputFormat.class);
-    jobconf.setOutputKeyClass(Text.class);
-    jobconf.setOutputValueClass(Text.class);
-
-    jobconf.setMapperClass(FSCopyFilesMapper.class);
-    jobconf.setNumReduceTasks(0);
-    return jobconf;
-  }
-
-  private static final Random RANDOM = new Random();
-  private static String getRandomId() {
-    return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
-  }
-
-  private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
-    boolean update = flags.contains(Options.UPDATE);
-    boolean overwrite = !update && flags.contains(Options.OVERWRITE);
-    jobConf.setBoolean("distcp.overwrite.ifnewer", update);
-    jobConf.setBoolean("distcp.overwrite.always", overwrite);
-    jobConf.setBoolean("distcp.ignore.read.failures",
-        flags.contains(Options.IGNORE_READ_FAILURES));
-    jobConf.setBoolean("distcp.preserve.status.info",
-        flags.contains(Options.PRESERVE_STATUS));
-    return update || overwrite;
-  }
-
   /**
   /**
    * Initialize DFSCopyFileMapper specific job-configuration.
    * Initialize DFSCopyFileMapper specific job-configuration.
    * @param conf : The dfs/mapred configuration.
    * @param conf : The dfs/mapred configuration.
@@ -678,27 +633,46 @@ public class CopyFiles implements Tool {
    * @param flags : Command-line flags
    * @param flags : Command-line flags
    */
    */
   private static void setup(Configuration conf, JobConf jobConf,
   private static void setup(Configuration conf, JobConf jobConf,
-                            List<Path> srcPaths, final Path destPath,
-                            Path logPath, EnumSet<Options> flags)
+                            List<Path> srcPaths, Path destPath,
+                            Path logPath, EnumSet<cpOpts> flags)
       throws IOException {
       throws IOException {
-    jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
-    final boolean updateORoverwrite = setBooleans(jobConf, flags);
+    boolean update;
+    boolean overwrite;
+    jobConf.set("copy.dest.path", destPath.toUri().toString());
+
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+
+    jobConf.setInputFormat(CopyInputFormat.class);
+
+    jobConf.setOutputKeyClass(Text.class);
+    jobConf.setOutputValueClass(Text.class);
+
+    jobConf.setMapperClass(FSCopyFilesMapper.class);
 
 
-    final String randomId = getRandomId();
+    jobConf.setNumReduceTasks(0);
+    jobConf.setBoolean("distcp.ignore.read.failures",
+        flags.contains(cpOpts.IGNORE_READ_FAILURES));
+    jobConf.setBoolean("distcp.preserve.status.info",
+        flags.contains(cpOpts.PRESERVE_STATUS));
+    jobConf.setBoolean("distcp.overwrite.ifnewer",
+        update = flags.contains(cpOpts.UPDATE));
+    jobConf.setBoolean("distcp.overwrite.always",
+        overwrite = !update && flags.contains(cpOpts.OVERWRITE));
+
+    Random r = new Random();
+    String randomId = Integer.toString(r.nextInt(Integer.MAX_VALUE), 36);
     Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + randomId);
     Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + randomId);
     jobConf.set("distcp.job.dir", jobDirectory.toString());
     jobConf.set("distcp.job.dir", jobDirectory.toString());
-
-    FileSystem dstfs = destPath.getFileSystem(conf);
-    boolean dstExists = dstfs.exists(destPath);
-    boolean dstIsDir = false;
-    if (dstExists) {
-      dstIsDir = dstfs.getFileStatus(destPath).isDir();
-    }
+    Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
+    jobConf.set("distcp.src.list", srcfilelist.toString());
 
 
     // default logPath
     // default logPath
+    FileSystem dstfs = destPath.getFileSystem(conf);
     if (logPath == null) {
     if (logPath == null) {
       String filename = "_distcp_logs_" + randomId;
       String filename = "_distcp_logs_" + randomId;
-      if (!dstExists || !dstIsDir) {
+      if (!dstfs.exists(destPath) || !dstfs.getFileStatus(destPath).isDir()) {
         Path parent = destPath.getParent();
         Path parent = destPath.getParent();
         dstfs.mkdirs(parent);
         dstfs.mkdirs(parent);
         logPath = new Path(parent, filename);
         logPath = new Path(parent, filename);
@@ -707,32 +681,31 @@ public class CopyFiles implements Tool {
       }
       }
     }
     }
     jobConf.setOutputPath(logPath);
     jobConf.setOutputPath(logPath);
-    
-    // create src list, dst list
-    FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
 
 
-    Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
-    jobConf.set("distcp.src.list", srcfilelist.toString());
-    SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
-        srcfilelist, LongWritable.class, FilePair.class,
+    // create src list
+    SequenceFile.Writer writer = SequenceFile.createWriter(
+        jobDirectory.getFileSystem(jobConf), jobConf, srcfilelist,
+        LongWritable.class, FilePair.class,
         SequenceFile.CompressionType.NONE);
         SequenceFile.CompressionType.NONE);
 
 
-    Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
-    SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
-        dstfilelist, Text.class, Text.class,
-        SequenceFile.CompressionType.NONE);
-
-    // handle the case where the destination directory doesn't exist
-    // and we've only a single src directory OR we're updating/overwriting
-    // the contents of the destination directory.
-    final boolean special =
-      (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
-    int cnfiles = 0, cnsyncf = 0;
-    long cbsize = 0L, cbsyncs = 0L;
+    int cnfiles = 0;
+    long cbsize = 0L;
     try {
     try {
+      // handle the case where the destination directory doesn't exist
+      // and we've only a single src directory OR we're updating/overwriting
+      // the contents of the destination directory.
+      final boolean special_case =
+        (srcPaths.size() == 1 && !dstfs.exists(destPath))
+        || update || overwrite;
+      int cnsyncf = 0;
+      long cbsyncs = 0L;
       for (Path p : srcPaths) {
       for (Path p : srcPaths) {
+        Path root = p.getParent();
         FileSystem fs = p.getFileSystem(conf);
         FileSystem fs = p.getFileSystem(conf);
-        Path root = special && fs.getFileStatus(p).isDir()? p: p.getParent(); 
+
+        if (special_case && fs.getFileStatus(p).isDir()) {
+          root = p;
+        }
 
 
         Stack<Path> pathstack = new Stack<Path>();
         Stack<Path> pathstack = new Stack<Path>();
         pathstack.push(p);
         pathstack.push(p);
@@ -740,100 +713,38 @@ public class CopyFiles implements Tool {
           for (FileStatus stat : fs.listStatus(pathstack.pop())) {
           for (FileStatus stat : fs.listStatus(pathstack.pop())) {
             if (stat.isDir()) {
             if (stat.isDir()) {
               pathstack.push(stat.getPath());
               pathstack.push(stat.getPath());
-            }
-            else {
+            } else {
               ++cnsyncf;
               ++cnsyncf;
               cbsyncs += stat.getLen();
               cbsyncs += stat.getLen();
               ++cnfiles;
               ++cnfiles;
               cbsize += stat.getLen();
               cbsize += stat.getLen();
-
-              if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
-                src_writer.sync();
-                dst_writer.sync();
-                cnsyncf = 0;
-                cbsyncs = 0L;
-              }
             }
             }
-
-            Path dst = makeRelative(root, stat.getPath());
-            src_writer.append(new LongWritable(stat.isDir()? 0: stat.getLen()),
-                new FilePair(stat, dst));
-            dst_writer.append(new Text(dst.toString()),
-                new Text(stat.getPath().toString()));
+            if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+              writer.sync();
+              cnsyncf = 0;
+              cbsyncs = 0L;
+            }
+            writer.append(new LongWritable(stat.isDir() ? 0 : stat.getLen()),
+                          new FilePair(stat, new Path(destPath,
+                              makeRelative(root, stat.getPath()))));
           }
           }
         }
         }
       }
       }
     } finally {
     } finally {
-      checkAndClose(src_writer);
-      checkAndClose(dst_writer);
+      writer.close();
     }
     }
 
 
     // create dest path dir if copying > 1 file
     // create dest path dir if copying > 1 file
-    if (!dstfs.exists(destPath)) {
-      if (cnfiles > 1 && !dstfs.mkdirs(destPath)) {
-        throw new IOException("Failed to create" + destPath);
-      }
+    if (cnfiles > 1 && !dstfs.mkdirs(destPath)) {
+      throw new IOException("Failed to create" + destPath);
     }
     }
-    
-    checkDuplication(jobfs, dstfilelist,
-        new Path(jobDirectory, "_distcp_sorted"), conf);
-
-    Path tmpDir = new Path(
-        (dstExists && !dstIsDir) || (!dstExists && cnfiles == 1)?
-        destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
-    jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
+
     jobConf.setInt("distcp.file.count", cnfiles);
     jobConf.setInt("distcp.file.count", cnfiles);
     jobConf.setLong("distcp.total.size", cbsize);
     jobConf.setLong("distcp.total.size", cbsize);
-    jobConf.setNumMapTasks(getMapCount(cbsize,
-        new JobClient(jobConf).getClusterStatus().getTaskTrackers()));
-  }
-
-  static private void checkDuplication(FileSystem fs, Path file, Path sorted,
-    Configuration conf) throws IOException {
-    SequenceFile.Reader in = null;
-    try {
-      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
-        new Text.Comparator(), Text.class, conf);
-      sorter.sort(file, sorted);
-      in = new SequenceFile.Reader(fs, sorted, conf);
-
-      Text prevdst = null, curdst = new Text();
-      Text prevsrc = null, cursrc = new Text(); 
-      for(; in.next(curdst, cursrc); ) {
-        if (prevdst != null && curdst.equals(prevdst)) {
-          throw new DuplicationException(
-            "Invalid input, there are duplicated files in the sources: "
-            + prevsrc + ", " + cursrc);
-        }
-        prevdst = curdst;
-        curdst = new Text();
-        prevsrc = cursrc;
-        cursrc = new Text();
-      }
-    }
-    finally {
-      checkAndClose(in);
-    }
-  } 
 
 
-  static boolean checkAndClose(java.io.Closeable io) {
-    if (io != null) {
-      try {
-        io.close();
-      }
-      catch(IOException ioe) {
-        LOG.warn(StringUtils.stringifyException(ioe));
-        return false;
-      }
-    }
-    return true;
+    JobClient client = new JobClient(jobConf);
+    jobConf.setNumMapTasks(getMapCount(cbsize,
+          client.getClusterStatus().getTaskTrackers()));
   }
   }
 
 
-  /** An exception class for duplicated source files. */
-  public static class DuplicationException extends IOException {
-    private static final long serialVersionUID = 1L;
-    /** Error code for this exception */
-    public static final int ERROR_CODE = -2;
-    DuplicationException(String message) {super(message);}
-  }
 }
 }

+ 0 - 21
src/test/org/apache/hadoop/fs/TestCopyFiles.java

@@ -388,25 +388,4 @@ public class TestCopyFiles extends TestCase {
     }
     }
   }
   }
 
 
-  public void testCopyDuplication() throws Exception {
-    try {    
-      MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
-      ToolRunner.run(new CopyFiles(new Configuration()),
-          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
-      assertTrue("Source and destination directories do not match.",
-                 checkFiles("local", TEST_ROOT_DIR+"/src2/srcdat", files));
-  
-      assertEquals(CopyFiles.DuplicationException.ERROR_CODE,
-          ToolRunner.run(new CopyFiles(new Configuration()),
-          new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
-                        "file:///"+TEST_ROOT_DIR+"/destdst",}));
-    }
-    finally {
-      deldir("local", TEST_ROOT_DIR+"/destdat");
-      deldir("local", TEST_ROOT_DIR+"/srcdat");
-      deldir("local", TEST_ROOT_DIR+"/src2");
-    }
-  }
 }
 }