Prechádzať zdrojové kódy

MAPREDUCE-5899. Merge change r1596931 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1596937 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 rokov pred
rodič
commit
50acb4032f
22 zmenil súbory, kde vykonal 592 pridanie a 188 odobranie
  1. 13 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  2. 6 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
  3. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
  4. 9 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
  6. 23 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  7. 27 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  8. 61 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  9. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
  11. 75 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
  12. 2 0
      hadoop-mapreduce-project/CHANGES.txt
  13. 1 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
  14. 4 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
  15. 28 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
  16. 4 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
  17. 70 35
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
  18. 73 47
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
  19. 24 0
      hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java
  20. 44 0
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
  21. 121 62
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
  22. 3 2
      hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java

+ 13 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -2141,9 +2141,21 @@ public abstract class FileSystem extends Configured implements Closeable {
    *  in the corresponding FileSystem.
    *  in the corresponding FileSystem.
    */
    */
   public FileChecksum getFileChecksum(Path f) throws IOException {
   public FileChecksum getFileChecksum(Path f) throws IOException {
+    return getFileChecksum(f, Long.MAX_VALUE);
+  }
+
+  /**
+   * Get the checksum of a file, from the beginning of the file till the
+   * specific length.
+   * @param f The file path
+   * @param length The length of the file range for checksum calculation
+   * @return The file checksum.
+   */
+  public FileChecksum getFileChecksum(Path f, final long length)
+      throws IOException {
     return null;
     return null;
   }
   }
-  
+
   /**
   /**
    * Set the verify checksum flag. This is only applicable if the 
    * Set the verify checksum flag. This is only applicable if the 
    * corresponding FileSystem supports checksum. By default doesn't do anything.
    * corresponding FileSystem supports checksum. By default doesn't do anything.

+ 6 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
@@ -427,7 +426,12 @@ public class FilterFileSystem extends FileSystem {
   public FileChecksum getFileChecksum(Path f) throws IOException {
   public FileChecksum getFileChecksum(Path f) throws IOException {
     return fs.getFileChecksum(f);
     return fs.getFileChecksum(f);
   }
   }
-  
+
+  @Override
+  public FileChecksum getFileChecksum(Path f, long length) throws IOException {
+    return fs.getFileChecksum(f, length);
+  }
+
   @Override
   @Override
   public void setVerifyChecksum(boolean verifyChecksum) {
   public void setVerifyChecksum(boolean verifyChecksum) {
     fs.setVerifyChecksum(verifyChecksum);
     fs.setVerifyChecksum(verifyChecksum);

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java

@@ -687,7 +687,7 @@ public class HarFileSystem extends FileSystem {
    * @return null since no checksum algorithm is implemented.
    * @return null since no checksum algorithm is implemented.
    */
    */
   @Override
   @Override
-  public FileChecksum getFileChecksum(Path f) {
+  public FileChecksum getFileChecksum(Path f, long length) {
     return null;
     return null;
   }
   }
 
 

+ 9 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java

@@ -138,6 +138,7 @@ public class TestHarFileSystem {
     public int getDefaultPort();
     public int getDefaultPort();
     public String getCanonicalServiceName();
     public String getCanonicalServiceName();
     public Token<?> getDelegationToken(String renewer) throws IOException;
     public Token<?> getDelegationToken(String renewer) throws IOException;
+    public FileChecksum getFileChecksum(Path f) throws IOException;
     public boolean deleteOnExit(Path f) throws IOException;
     public boolean deleteOnExit(Path f) throws IOException;
     public boolean cancelDeleteOnExit(Path f) throws IOException;
     public boolean cancelDeleteOnExit(Path f) throws IOException;
     public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
     public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
@@ -207,10 +208,16 @@ public class TestHarFileSystem {
   }
   }
 
 
   @Test
   @Test
-  public void testFileChecksum() {
+  public void testFileChecksum() throws Exception {
     final Path p = new Path("har://file-localhost/foo.har/file1");
     final Path p = new Path("har://file-localhost/foo.har/file1");
     final HarFileSystem harfs = new HarFileSystem();
     final HarFileSystem harfs = new HarFileSystem();
-    Assert.assertEquals(null, harfs.getFileChecksum(p));
+    try {
+      Assert.assertEquals(null, harfs.getFileChecksum(p));
+    } finally {
+      if (harfs != null) {
+        harfs.close();
+      }
+    }
   }
   }
 
 
   /**
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -115,7 +115,7 @@ public class Hdfs extends AbstractFileSystem {
   @Override
   @Override
   public FileChecksum getFileChecksum(Path f) 
   public FileChecksum getFileChecksum(Path f) 
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
-    return dfs.getFileChecksum(getUriPath(f));
+    return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
   }
   }
 
 
   @Override
   @Override

+ 23 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1817,15 +1817,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
    }
    }
 
 
   /**
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file of a range of the file. Note that the
+   * range always starts from the beginning of the file.
    * @param src The file path
    * @param src The file path
+   * @param length The length of the range
    * @return The checksum 
    * @return The checksum 
    * @see DistributedFileSystem#getFileChecksum(Path)
    * @see DistributedFileSystem#getFileChecksum(Path)
    */
    */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+      throws IOException {
     checkOpen();
     checkOpen();
-    return getFileChecksum(src, clientName, namenode, socketFactory,
-        dfsClientConf.socketTimeout, getDataEncryptionKey(),
+    Preconditions.checkArgument(length >= 0);
+    return getFileChecksum(src, length, clientName, namenode,
+        socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
         dfsClientConf.connectToDnViaHostname);
         dfsClientConf.connectToDnViaHostname);
   }
   }
   
   
@@ -1866,8 +1870,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   }
   }
 
 
   /**
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file or a range of the file.
    * @param src The file path
    * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
    * @param clientName the name of the client requesting the checksum.
    * @param clientName the name of the client requesting the checksum.
    * @param namenode the RPC proxy for the namenode
    * @param namenode the RPC proxy for the namenode
    * @param socketFactory to create sockets to connect to DNs
    * @param socketFactory to create sockets to connect to DNs
@@ -1877,12 +1882,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
    * @return The checksum 
    * @return The checksum 
    */
    */
   private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
   private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      String clientName,
-      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+      long length, String clientName, ClientProtocol namenode,
+      SocketFactory socketFactory, int socketTimeout,
       DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
       DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
       throws IOException {
       throws IOException {
-    //get all block locations
-    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+    //get block locations for the file range
+    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
+        length);
     if (null == blockLocations) {
     if (null == blockLocations) {
       throw new FileNotFoundException("File does not exist: " + src);
       throw new FileNotFoundException("File does not exist: " + src);
     }
     }
@@ -1894,10 +1900,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
     boolean refetchBlocks = false;
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
     int lastRetriedIndex = -1;
 
 
-    //get block checksum for each block
-    for(int i = 0; i < locatedblocks.size(); i++) {
+    // get block checksum for each block
+    long remaining = length;
+    for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
       if (refetchBlocks) {  // refetch to get fresh tokens
       if (refetchBlocks) {  // refetch to get fresh tokens
-        blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+        blockLocations = callGetBlockLocations(namenode, src, 0, length);
         if (null == blockLocations) {
         if (null == blockLocations) {
           throw new FileNotFoundException("File does not exist: " + src);
           throw new FileNotFoundException("File does not exist: " + src);
         }
         }
@@ -1906,6 +1913,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
       }
       }
       LocatedBlock lb = locatedblocks.get(i);
       LocatedBlock lb = locatedblocks.get(i);
       final ExtendedBlock block = lb.getBlock();
       final ExtendedBlock block = lb.getBlock();
+      if (remaining < block.getNumBytes()) {
+        block.setNumBytes(remaining);
+      }
+      remaining -= block.getNumBytes();
       final DatanodeInfo[] datanodes = lb.getLocations();
       final DatanodeInfo[] datanodes = lb.getLocations();
       
       
       //try each datanode location of the block
       //try each datanode location of the block

+ 27 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -66,14 +66,12 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -83,7 +81,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
@@ -1188,7 +1185,7 @@ public class DistributedFileSystem extends FileSystem {
       @Override
       @Override
       public FileChecksum doCall(final Path p)
       public FileChecksum doCall(final Path p)
           throws IOException, UnresolvedLinkException {
           throws IOException, UnresolvedLinkException {
-        return dfs.getFileChecksum(getPathName(p));
+        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
       }
       }
 
 
       @Override
       @Override
@@ -1199,6 +1196,32 @@ public class DistributedFileSystem extends FileSystem {
     }.resolve(this, absF);
     }.resolve(this, absF);
   }
   }
 
 
+  @Override
+  public FileChecksum getFileChecksum(Path f, final long length)
+      throws IOException {
+    statistics.incrementReadOps(1);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FileChecksum>() {
+      @Override
+      public FileChecksum doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getFileChecksum(getPathName(p), length);
+      }
+
+      @Override
+      public FileChecksum next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+        } else {
+          throw new UnsupportedFileSystemException(
+              "getFileChecksum(Path, long) is not supported by "
+                  + fs.getClass().getSimpleName()); 
+        }
+      }
+    }.resolve(this, absF);
+  }
+
   @Override
   @Override
   public void setPermission(Path p, final FsPermission permission
   public void setPermission(Path p, final FsPermission permission
       ) throws IOException {
       ) throws IOException {

+ 61 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -42,6 +42,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 
 
@@ -802,7 +804,44 @@ class DataXceiver extends Receiver implements Runnable {
       IOUtils.closeStream(out);
       IOUtils.closeStream(out);
     }
     }
   }
   }
-  
+
+  private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
+      long requestLength, DataChecksum checksum, DataInputStream checksumIn)
+      throws IOException {
+    final int bytesPerCRC = checksum.getBytesPerChecksum();
+    final int csize = checksum.getChecksumSize();
+    final byte[] buffer = new byte[4*1024];
+    MessageDigest digester = MD5Hash.getDigester();
+
+    long remaining = requestLength / bytesPerCRC * csize;
+    for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+      toDigest = checksumIn.read(buffer, 0,
+          (int) Math.min(remaining, buffer.length));
+      if (toDigest < 0) {
+        break;
+      }
+      digester.update(buffer, 0, toDigest);
+    }
+    
+    int partialLength = (int) (requestLength % bytesPerCRC);
+    if (partialLength > 0) {
+      byte[] buf = new byte[partialLength];
+      final InputStream blockIn = datanode.data.getBlockInputStream(block,
+          requestLength - partialLength);
+      try {
+        // Get the CRC of the partialLength.
+        IOUtils.readFully(blockIn, buf, 0, partialLength);
+      } finally {
+        IOUtils.closeStream(blockIn);
+      }
+      checksum.update(buf, 0, partialLength);
+      byte[] partialCrc = new byte[csize];
+      checksum.writeValue(partialCrc, 0, true);
+      digester.update(partialCrc);
+    }
+    return new MD5Hash(digester.digest());
+  }
+
   @Override
   @Override
   public void blockChecksum(final ExtendedBlock block,
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
@@ -810,25 +849,32 @@ class DataXceiver extends Receiver implements Runnable {
         getOutputStream());
         getOutputStream());
     checkAccess(out, true, block, blockToken,
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
-    updateCurrentThreadName("Reading metadata for block " + block);
-    final LengthInputStream metadataIn = 
-      datanode.data.getMetaDataInputStream(block);
-    final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
-        metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+    // client side now can specify a range of the block for checksum
+    long requestLength = block.getNumBytes();
+    Preconditions.checkArgument(requestLength >= 0);
+    long visibleLength = datanode.data.getReplicaVisibleLength(block);
+    boolean partialBlk = requestLength < visibleLength;
 
 
+    updateCurrentThreadName("Reading metadata for block " + block);
+    final LengthInputStream metadataIn = datanode.data
+        .getMetaDataInputStream(block);
+    
+    final DataInputStream checksumIn = new DataInputStream(
+        new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
     updateCurrentThreadName("Getting checksum for block " + block);
     updateCurrentThreadName("Getting checksum for block " + block);
     try {
     try {
       //read metadata file
       //read metadata file
-      final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-      final DataChecksum checksum = header.getChecksum(); 
+      final BlockMetadataHeader header = BlockMetadataHeader
+          .readHeader(checksumIn);
+      final DataChecksum checksum = header.getChecksum();
+      final int csize = checksum.getChecksumSize();
       final int bytesPerCRC = checksum.getBytesPerChecksum();
       final int bytesPerCRC = checksum.getBytesPerChecksum();
-      final long crcPerBlock = checksum.getChecksumSize() > 0 
-              ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
-              : 0;
-      
-      //compute block checksum
-      final MD5Hash md5 = MD5Hash.digest(checksumIn);
+      final long crcPerBlock = csize <= 0 ? 0 : 
+        (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
 
 
+      final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? 
+          calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
+            : MD5Hash.digest(checksumIn);
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
         LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
             + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
             + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
@@ -841,8 +887,7 @@ class DataXceiver extends Receiver implements Runnable {
           .setBytesPerCrc(bytesPerCRC)
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelper.convert(checksum.getChecksumType()))
-          )
+          .setCrcType(PBHelper.convert(checksum.getChecksumType())))
         .build()
         .build()
         .writeDelimitedTo(out);
         .writeDelimitedTo(out);
       out.flush();
       out.flush();

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java

@@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods {
       MD5MD5CRC32FileChecksum checksum = null;
       MD5MD5CRC32FileChecksum checksum = null;
       DFSClient dfsclient = newDfsClient(nnId, conf);
       DFSClient dfsclient = newDfsClient(nnId, conf);
       try {
       try {
-        checksum = dfsclient.getFileChecksum(fullpath);
+        checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE);
         dfsclient.close();
         dfsclient.close();
         dfsclient = null;
         dfsclient = null;
       } finally {
       } finally {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java

@@ -121,7 +121,7 @@ public class FileChecksumServlets {
       try {
       try {
         final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
         final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
             datanode, conf, getUGI(request, conf));
             datanode, conf, getUGI(request, conf));
-        final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
+        final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE);
         MD5MD5CRC32FileChecksum.write(xml, checksum);
         MD5MD5CRC32FileChecksum.write(xml, checksum);
       } catch(IOException ioe) {
       } catch(IOException ioe) {
         writeXml(ioe, path, xml);
         writeXml(ioe, path, xml);

+ 75 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestGetFileChecksum {
+  private static final int BLOCKSIZE = 1024;
+  private static final short REPLICATION = 3;
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
+        .build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public void testGetFileChecksum(final Path foo, final int appendLength)
+      throws Exception {
+    final int appendRounds = 16;
+    FileChecksum[] fc = new FileChecksum[appendRounds + 1];
+    DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L);
+    fc[0] = dfs.getFileChecksum(foo);
+    for (int i = 0; i < appendRounds; i++) {
+      DFSTestUtil.appendFile(dfs, foo, appendLength);
+      fc[i + 1] = dfs.getFileChecksum(foo);
+    }
+
+    for (int i = 0; i < appendRounds + 1; i++) {
+      FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1));
+      Assert.assertTrue(checksum.equals(fc[i]));
+    }
+  }
+
+  @Test
+  public void testGetFileChecksum() throws Exception {
+    testGetFileChecksum(new Path("/foo"), BLOCKSIZE / 4);
+    testGetFileChecksum(new Path("/bar"), BLOCKSIZE / 4 - 1);
+  }
+}

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -60,6 +60,8 @@ Release 2.5.0 - UNRELEASED
 
 
     MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth)
     MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth)
 
 
+    MAPREDUCE-5899. Support incremental data copy in DistCp. (jing9)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES 
   BUG FIXES 

+ 1 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

@@ -50,6 +50,7 @@ public class DistCpConstants {
   public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
   public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
   public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
   public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
   public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
   public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+  public static final String CONF_LABEL_APPEND = "distcp.copy.append";
   public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
   public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
   
   
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =

+ 4 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java

@@ -138,6 +138,10 @@ public enum DistCpOptionSwitch {
       new Option("overwrite", false, "Choose to overwrite target files " +
       new Option("overwrite", false, "Choose to overwrite target files " +
           "unconditionally, even if they exist.")),
           "unconditionally, even if they exist.")),
 
 
+  APPEND(DistCpConstants.CONF_LABEL_APPEND,
+      new Option("append", false,
+          "Reuse existing data in target files and append new data to them if possible")),
+
   /**
   /**
    * Should DisctpExecution be blocking
    * Should DisctpExecution be blocking
    */
    */

+ 28 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

@@ -39,6 +39,7 @@ public class DistCpOptions {
   private boolean deleteMissing = false;
   private boolean deleteMissing = false;
   private boolean ignoreFailures = false;
   private boolean ignoreFailures = false;
   private boolean overwrite = false;
   private boolean overwrite = false;
+  private boolean append = false;
   private boolean skipCRC = false;
   private boolean skipCRC = false;
   private boolean blocking = true;
   private boolean blocking = true;
 
 
@@ -244,6 +245,22 @@ public class DistCpOptions {
     this.overwrite = overwrite;
     this.overwrite = overwrite;
   }
   }
 
 
+  /**
+   * @return whether we can append new data to target files
+   */
+  public boolean shouldAppend() {
+    return append;
+  }
+
+  /**
+   * Set if we want to append new data to target files. This is valid only with
+   * update option and CRC is not skipped.
+   */
+  public void setAppend(boolean append) {
+    validate(DistCpOptionSwitch.APPEND, append);
+    this.append = append;
+  }
+
   /**
   /**
    * Should CRC/checksum check be skipped while checking files are identical
    * Should CRC/checksum check be skipped while checking files are identical
    *
    *
@@ -472,6 +489,7 @@ public class DistCpOptions {
         value : this.atomicCommit);
         value : this.atomicCommit);
     boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
     boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
         value : this.skipCRC);
         value : this.skipCRC);
+    boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
 
 
     if (syncFolder && atomicCommit) {
     if (syncFolder && atomicCommit) {
       throw new IllegalArgumentException("Atomic commit can't be used with " +
       throw new IllegalArgumentException("Atomic commit can't be used with " +
@@ -492,6 +510,14 @@ public class DistCpOptions {
       throw new IllegalArgumentException("Skip CRC is valid only with update options");
       throw new IllegalArgumentException("Skip CRC is valid only with update options");
     }
     }
 
 
+    if (!syncFolder && append) {
+      throw new IllegalArgumentException(
+          "Append is valid only with update options");
+    }
+    if (skipCRC && append) {
+      throw new IllegalArgumentException(
+          "Append is disallowed when skipping CRC");
+    }
   }
   }
 
 
   /**
   /**
@@ -510,6 +536,8 @@ public class DistCpOptions {
         String.valueOf(deleteMissing));
         String.valueOf(deleteMissing));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
         String.valueOf(overwrite));
         String.valueOf(overwrite));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
+        String.valueOf(append));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
         String.valueOf(skipCRC));
         String.valueOf(skipCRC));
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
     DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,

+ 4 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java

@@ -140,6 +140,10 @@ public class OptionsParser {
       option.setOverwrite(true);
       option.setOverwrite(true);
     }
     }
 
 
+    if (command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())) {
+      option.setAppend(true);
+    }
+
     if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
     if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
       option.setDeleteMissing(true);
       option.setDeleteMissing(true);
     }
     }

+ 70 - 35
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java

@@ -18,13 +18,20 @@
 
 
 package org.apache.hadoop.tools.mapred;
 package org.apache.hadoop.tools.mapred;
 
 
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.EnumSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -36,11 +43,6 @@ import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
-import java.io.*;
-import java.util.EnumSet;
-import java.util.Arrays;
-import java.util.List;
-
 /**
 /**
  * Mapper class that executes the DistCp copy operation.
  * Mapper class that executes the DistCp copy operation.
  * Implements the o.a.h.mapreduce.Mapper<> interface.
  * Implements the o.a.h.mapreduce.Mapper<> interface.
@@ -62,6 +64,15 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     BYTESSKIPPED, // Number of bytes that were skipped from copy.
     BYTESSKIPPED, // Number of bytes that were skipped from copy.
   }
   }
 
 
+  /**
+   * Indicate the action for each file
+   */
+  static enum FileAction {
+    SKIP,         // Skip copying the file since it's already in the target FS
+    APPEND,       // Only need to append new data to the file in the target FS 
+    OVERWRITE,    // Overwrite the whole file
+  }
+
   private static Log LOG = LogFactory.getLog(CopyMapper.class);
   private static Log LOG = LogFactory.getLog(CopyMapper.class);
 
 
   private Configuration conf;
   private Configuration conf;
@@ -70,6 +81,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private boolean ignoreFailures = false;
   private boolean ignoreFailures = false;
   private boolean skipCrc = false;
   private boolean skipCrc = false;
   private boolean overWrite = false;
   private boolean overWrite = false;
+  private boolean append = false;
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
 
 
   private FileSystem targetFS = null;
   private FileSystem targetFS = null;
@@ -90,6 +102,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
     ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
     skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
     skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
     overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
     overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
+    append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
     preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
     preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
         PRESERVE_STATUS.getConfigLabel()));
         PRESERVE_STATUS.getConfigLabel()));
 
 
@@ -224,20 +237,19 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         return;
         return;
       }
       }
 
 
-      if (skipFile(sourceFS, sourceCurrStatus, target)) {
+      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
+      if (action == FileAction.SKIP) {
         LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
         LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                  + " to " + target);
                  + " to " + target);
         updateSkipCounters(context, sourceCurrStatus);
         updateSkipCounters(context, sourceCurrStatus);
         context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
         context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
-      }
-      else {
+      } else {
         copyFileWithRetry(description, sourceCurrStatus, target, context,
         copyFileWithRetry(description, sourceCurrStatus, target, context,
-                          fileAttributes);
+            action, fileAttributes);
       }
       }
 
 
       DistCpUtils.preserve(target.getFileSystem(conf), target,
       DistCpUtils.preserve(target.getFileSystem(conf), target,
                            sourceCurrStatus, fileAttributes);
                            sourceCurrStatus, fileAttributes);
-
     } catch (IOException exception) {
     } catch (IOException exception) {
       handleFailures(exception, sourceFileStatus, target, context);
       handleFailures(exception, sourceFileStatus, target, context);
     }
     }
@@ -254,14 +266,14 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     return DistCpUtils.unpackAttributes(attributeString);
     return DistCpUtils.unpackAttributes(attributeString);
   }
   }
 
 
-  private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
-               Path target, Context context,
-               EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
-
+  private void copyFileWithRetry(String description,
+      FileStatus sourceFileStatus, Path target, Context context,
+      FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
+      throws IOException {
     long bytesCopied;
     long bytesCopied;
     try {
     try {
-      bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description)
-                       .execute(sourceFileStatus, target, context, fileAttributes);
+      bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
+          action).execute(sourceFileStatus, target, context, fileAttributes);
     } catch (Exception e) {
     } catch (Exception e) {
       context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
       context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
       throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
       throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
@@ -311,25 +323,48 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     context.getCounter(counter).increment(value);
     context.getCounter(counter).increment(value);
   }
   }
 
 
-  private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
-                                          throws IOException {
-    return     targetFS.exists(target)
-            && !overWrite
-            && !mustUpdate(sourceFS, source, target);
+  private FileAction checkUpdate(FileSystem sourceFS, FileStatus source,
+      Path target) throws IOException {
+    final FileStatus targetFileStatus;
+    try {
+      targetFileStatus = targetFS.getFileStatus(target);
+    } catch (FileNotFoundException e) {
+      return FileAction.OVERWRITE;
+    }
+    if (targetFileStatus != null && !overWrite) {
+      if (canSkip(sourceFS, source, targetFileStatus)) {
+        return FileAction.SKIP;
+      } else if (append) {
+        long targetLen = targetFileStatus.getLen();
+        if (targetLen < source.getLen()) {
+          FileChecksum sourceChecksum = sourceFS.getFileChecksum(
+              source.getPath(), targetLen);
+          if (sourceChecksum != null
+              && sourceChecksum.equals(targetFS.getFileChecksum(target))) {
+            // We require that the checksum is not null. Thus currently only
+            // DistributedFileSystem is supported
+            return FileAction.APPEND;
+          }
+        }
+      }
+    }
+    return FileAction.OVERWRITE;
   }
   }
 
 
-  private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
-                                    throws IOException {
-    final FileStatus targetFileStatus = targetFS.getFileStatus(target);
-
-    return     syncFolders
-            && (
-                   targetFileStatus.getLen() != source.getLen()
-                || (!skipCrc &&
-                       !DistCpUtils.checksumsAreEqual(sourceFS,
-                          source.getPath(), null, targetFS, target))
-                || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
-                      preserve.contains(FileAttribute.BLOCKSIZE))
-               );
+  private boolean canSkip(FileSystem sourceFS, FileStatus source, 
+      FileStatus target) throws IOException {
+    if (!syncFolders) {
+      return true;
+    }
+    boolean sameLength = target.getLen() == source.getLen();
+    boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
+        || !preserve.contains(FileAttribute.BLOCKSIZE);
+    if (sameLength && sameBlockSize) {
+      return skipCrc ||
+          DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
+              targetFS, target.getPath());
+    } else {
+      return false;
+    }
   }
   }
 }
 }

+ 73 - 47
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

@@ -18,10 +18,8 @@
 
 
 package org.apache.hadoop.tools.mapred;
 package org.apache.hadoop.tools.mapred;
 
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.util.EnumSet;
 import java.util.EnumSet;
 
 
@@ -29,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.DistCpUtils;
 import org.apache.hadoop.tools.util.RetriableCommand;
 import org.apache.hadoop.tools.util.RetriableCommand;
 import org.apache.hadoop.tools.util.ThrottledInputStream;
 import org.apache.hadoop.tools.util.ThrottledInputStream;
@@ -54,13 +55,15 @@ public class RetriableFileCopyCommand extends RetriableCommand {
   private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
   private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
   private static int BUFFER_SIZE = 8 * 1024;
   private static int BUFFER_SIZE = 8 * 1024;
   private boolean skipCrc = false;
   private boolean skipCrc = false;
+  private FileAction action;
 
 
   /**
   /**
    * Constructor, taking a description of the action.
    * Constructor, taking a description of the action.
    * @param description Verbose description of the copy operation.
    * @param description Verbose description of the copy operation.
    */
    */
-  public RetriableFileCopyCommand(String description) {
+  public RetriableFileCopyCommand(String description, FileAction action) {
     super(description);
     super(description);
+    this.action = action;
   }
   }
 
 
   /**
   /**
@@ -68,9 +71,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
    *
    *
    * @param skipCrc Whether to skip the crc check.
    * @param skipCrc Whether to skip the crc check.
    * @param description A verbose description of the copy operation.
    * @param description A verbose description of the copy operation.
+   * @param action We should overwrite the target file or append new data to it.
    */
    */
-  public RetriableFileCopyCommand(boolean skipCrc, String description) {
-    this(description);
+  public RetriableFileCopyCommand(boolean skipCrc, String description,
+      FileAction action) {
+    this(description, action);
     this.skipCrc = skipCrc;
     this.skipCrc = skipCrc;
   }
   }
 
 
@@ -96,18 +101,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
   }
   }
 
 
   private long doCopy(FileStatus sourceFileStatus, Path target,
   private long doCopy(FileStatus sourceFileStatus, Path target,
-                      Mapper.Context context,
-                      EnumSet<FileAttribute> fileAttributes)
-          throws IOException {
-
-    Path tmpTargetPath = getTmpFile(target, context);
+      Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
+      throws IOException {
+    final boolean toAppend = action == FileAction.APPEND;
+    Path targetPath = toAppend ? target : getTmpFile(target, context);
     final Configuration configuration = context.getConfiguration();
     final Configuration configuration = context.getConfiguration();
     FileSystem targetFS = target.getFileSystem(configuration);
     FileSystem targetFS = target.getFileSystem(configuration);
 
 
     try {
     try {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
         LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
-        LOG.debug("Tmp-file path: " + tmpTargetPath);
+        LOG.debug("Target file path: " + targetPath);
       }
       }
       final Path sourcePath = sourceFileStatus.getPath();
       final Path sourcePath = sourceFileStatus.getPath();
       final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
       final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
@@ -115,22 +119,31 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
           .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
           .getFileChecksum(sourcePath) : null;
           .getFileChecksum(sourcePath) : null;
 
 
-      long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
-          context, fileAttributes, sourceChecksum);
+      final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
+          target).getLen() : 0;
+      long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
+          offset, context, fileAttributes, sourceChecksum);
 
 
-      compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
-          bytesRead);
+      compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
+          + offset);
       //At this point, src&dest lengths are same. if length==0, we skip checksum
       //At this point, src&dest lengths are same. if length==0, we skip checksum
       if ((bytesRead != 0) && (!skipCrc)) {
       if ((bytesRead != 0) && (!skipCrc)) {
         compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
         compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
-            targetFS, tmpTargetPath);
+            targetFS, targetPath);
+      }
+      // it's not append case, thus we first write to a temporary file, rename
+      // it to the target path.
+      if (!toAppend) {
+        promoteTmpToTarget(targetPath, target, targetFS);
       }
       }
-      promoteTmpToTarget(tmpTargetPath, target, targetFS);
       return bytesRead;
       return bytesRead;
-
     } finally {
     } finally {
-      if (targetFS.exists(tmpTargetPath))
-        targetFS.delete(tmpTargetPath, false);
+      // note that for append case, it is possible that we append partial data
+      // and then fail. In that case, for the next retry, we either reuse the
+      // partial appended data if it is good or we overwrite the whole file
+      if (!toAppend && targetFS.exists(targetPath)) {
+        targetFS.delete(targetPath, false);
+      }
     }
     }
   }
   }
 
 
@@ -147,29 +160,37 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     return null;
     return null;
   }
   }
 
 
-  private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
-      FileStatus sourceFileStatus, Mapper.Context context,
+  private long copyToFile(Path targetPath, FileSystem targetFS,
+      FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
       EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
       EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
       throws IOException {
       throws IOException {
     FsPermission permission = FsPermission.getFileDefault().applyUMask(
     FsPermission permission = FsPermission.getFileDefault().applyUMask(
         FsPermission.getUMask(targetFS.getConf()));
         FsPermission.getUMask(targetFS.getConf()));
-    OutputStream outStream = new BufferedOutputStream(
-        targetFS.create(tmpTargetPath, permission,
-            EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
-            getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
-                tmpTargetPath),
-            getBlockSize(fileAttributes, sourceFileStatus, targetFS,
-                tmpTargetPath),
-            context, getChecksumOpt(fileAttributes, sourceChecksum)));
-    return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
+    final OutputStream outStream;
+    if (action == FileAction.OVERWRITE) {
+      final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
+          targetFS, targetPath);
+      final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
+          targetFS, targetPath);
+      FSDataOutputStream out = targetFS.create(targetPath, permission,
+          EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+          BUFFER_SIZE, repl, blockSize, context,
+          getChecksumOpt(fileAttributes, sourceChecksum));
+      outStream = new BufferedOutputStream(out);
+    } else {
+      outStream = new BufferedOutputStream(targetFS.append(targetPath,
+          BUFFER_SIZE));
+    }
+    return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
+        context);
   }
   }
 
 
   private void compareFileLengths(FileStatus sourceFileStatus, Path target,
   private void compareFileLengths(FileStatus sourceFileStatus, Path target,
-                                  Configuration configuration, long bytesRead)
+                                  Configuration configuration, long targetLen)
                                   throws IOException {
                                   throws IOException {
     final Path sourcePath = sourceFileStatus.getPath();
     final Path sourcePath = sourceFileStatus.getPath();
     FileSystem fs = sourcePath.getFileSystem(configuration);
     FileSystem fs = sourcePath.getFileSystem(configuration);
-    if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
+    if (fs.getFileStatus(sourcePath).getLen() != targetLen)
       throw new IOException("Mismatch in length of source:" + sourcePath
       throw new IOException("Mismatch in length of source:" + sourcePath
                 + " and target:" + target);
                 + " and target:" + target);
   }
   }
@@ -215,8 +236,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
-                         int bufferSize, Mapper.Context context)
+  long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
+      OutputStream outStream, int bufferSize, Mapper.Context context)
       throws IOException {
       throws IOException {
     Path source = sourceFileStatus.getPath();
     Path source = sourceFileStatus.getPath();
     byte buf[] = new byte[bufferSize];
     byte buf[] = new byte[bufferSize];
@@ -225,19 +246,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
 
 
     try {
     try {
       inStream = getInputStream(source, context.getConfiguration());
       inStream = getInputStream(source, context.getConfiguration());
-      int bytesRead = readBytes(inStream, buf);
+      int bytesRead = readBytes(inStream, buf, sourceOffset);
       while (bytesRead >= 0) {
       while (bytesRead >= 0) {
         totalBytesRead += bytesRead;
         totalBytesRead += bytesRead;
+        if (action == FileAction.APPEND) {
+          sourceOffset += bytesRead;
+        }
         outStream.write(buf, 0, bytesRead);
         outStream.write(buf, 0, bytesRead);
         updateContextStatus(totalBytesRead, context, sourceFileStatus);
         updateContextStatus(totalBytesRead, context, sourceFileStatus);
-        bytesRead = inStream.read(buf);
+        bytesRead = readBytes(inStream, buf, sourceOffset);
       }
       }
       outStream.close();
       outStream.close();
       outStream = null;
       outStream = null;
     } finally {
     } finally {
       IOUtils.cleanup(LOG, outStream, inStream);
       IOUtils.cleanup(LOG, outStream, inStream);
     }
     }
-
     return totalBytesRead;
     return totalBytesRead;
   }
   }
 
 
@@ -254,24 +277,27 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     context.setStatus(message.toString());
     context.setStatus(message.toString());
   }
   }
 
 
-  private static int readBytes(InputStream inStream, byte buf[])
-          throws IOException {
+  private static int readBytes(ThrottledInputStream inStream, byte buf[],
+      long position) throws IOException {
     try {
     try {
-      return inStream.read(buf);
-    }
-    catch (IOException e) {
+      if (position == 0) {
+        return inStream.read(buf);
+      } else {
+        return inStream.read(position, buf, 0, buf.length);
+      }
+    } catch (IOException e) {
       throw new CopyReadException(e);
       throw new CopyReadException(e);
     }
     }
   }
   }
 
 
-  private static ThrottledInputStream getInputStream(Path path, Configuration conf)
-          throws IOException {
+  private static ThrottledInputStream getInputStream(Path path,
+      Configuration conf) throws IOException {
     try {
     try {
       FileSystem fs = path.getFileSystem(conf);
       FileSystem fs = path.getFileSystem(conf);
       long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
       long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
               DistCpConstants.DEFAULT_BANDWIDTH_MB);
               DistCpConstants.DEFAULT_BANDWIDTH_MB);
-      return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
-              bandwidthMB * 1024 * 1024);
+      FSDataInputStream in = fs.open(path);
+      return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
     }
     }
     catch (IOException e) {
     catch (IOException e) {
       throw new CopyReadException(e);
       throw new CopyReadException(e);

+ 24 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

@@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+
+import com.google.common.base.Preconditions;
+
 /**
 /**
  * The ThrottleInputStream provides bandwidth throttling on a specified
  * The ThrottleInputStream provides bandwidth throttling on a specified
  * InputStream. It is implemented as a wrapper on top of another InputStream
  * InputStream. It is implemented as a wrapper on top of another InputStream
@@ -90,6 +95,25 @@ public class ThrottledInputStream extends InputStream {
     return readLen;
     return readLen;
   }
   }
 
 
+  /**
+   * Read bytes starting from the specified position. This requires rawStream is
+   * an instance of {@link PositionedReadable}.
+   */
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    if (!(rawStream instanceof PositionedReadable)) {
+      throw new UnsupportedOperationException(
+          "positioned read is not supported by the internal stream");
+    }
+    throttle();
+    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
+        offset, length);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
   private void throttle() throws IOException {
   private void throttle() throws IOException {
     if (getBytesPerSec() > maxBytesPerSec) {
     if (getBytesPerSec() > maxBytesPerSec) {
       try {
       try {

+ 44 - 0
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java

@@ -18,9 +18,12 @@
 
 
 package org.apache.hadoop.tools;
 package org.apache.hadoop.tools;
 
 
+import static org.junit.Assert.fail;
+
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.DistCpOptions.*;
 import org.apache.hadoop.tools.DistCpOptions.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -554,4 +557,45 @@ public class TestOptionsParser {
     Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
     Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
     Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
     Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
   }
   }
+
+  @Test
+  public void testAppendOption() {
+    Configuration conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(
+        DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+    DistCpOptions options = OptionsParser.parse(new String[] { "-update",
+        "-append", "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/" });
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(
+        DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+
+    // make sure -append is only valid when -update is specified
+    try {
+      options = OptionsParser.parse(new String[] { "-append",
+              "hdfs://localhost:8020/source/first",
+              "hdfs://localhost:8020/target/" });
+      fail("Append should fail if update option is not specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Append is valid only with update options", e);
+    }
+
+    // make sure -append is invalid when skipCrc is specified
+    try {
+      options = OptionsParser.parse(new String[] {
+          "-append", "-update", "-skipcrccheck",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/" });
+      fail("Append should fail if skipCrc option is specified");
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Append is disallowed when skipping CRC", e);
+    }
+  }
 }
 }

+ 121 - 62
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java

@@ -25,11 +25,13 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.List;
+import java.util.Random;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
@@ -118,6 +120,16 @@ public class TestCopyMapper {
     touchFile(SOURCE_PATH + "/7/8/9");
     touchFile(SOURCE_PATH + "/7/8/9");
   }
   }
 
 
+  private static void appendSourceData() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    for (Path source : pathList) {
+      if (fs.getFileStatus(source).isFile()) {
+        // append 2048 bytes per file
+        appendFile(source, DEFAULT_FILE_SIZE * 2);
+      }
+    }
+  }
+
   private static void createSourceDataWithDifferentBlockSize() throws Exception {
   private static void createSourceDataWithDifferentBlockSize() throws Exception {
     mkdirs(SOURCE_PATH + "/1");
     mkdirs(SOURCE_PATH + "/1");
     mkdirs(SOURCE_PATH + "/2");
     mkdirs(SOURCE_PATH + "/2");
@@ -201,85 +213,132 @@ public class TestCopyMapper {
     }
     }
   }
   }
 
 
+  /**
+   * Append specified length of bytes to a given file
+   */
+  private static void appendFile(Path p, int length) throws IOException {
+    byte[] toAppend = new byte[length];
+    Random random = new Random();
+    random.nextBytes(toAppend);
+    FSDataOutputStream out = cluster.getFileSystem().append(p);
+    try {
+      out.write(toAppend);
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+
   @Test
   @Test
   public void testCopyWithDifferentChecksumType() throws Exception {
   public void testCopyWithDifferentChecksumType() throws Exception {
     testCopy(true);
     testCopy(true);
   }
   }
 
 
   @Test(timeout=40000)
   @Test(timeout=40000)
-  public void testRun() {
+  public void testRun() throws Exception {
     testCopy(false);
     testCopy(false);
   }
   }
 
 
-  private void testCopy(boolean preserveChecksum) {
-    try {
-      deleteState();
-      if (preserveChecksum) {
-        createSourceDataWithDifferentChecksumType();
-      } else {
-        createSourceData();
-      }
+  @Test
+  public void testCopyWithAppend() throws Exception {
+    final FileSystem fs = cluster.getFileSystem();
+    // do the first distcp
+    testCopy(false);
+    // start appending data to source
+    appendSourceData();
 
 
-      FileSystem fs = cluster.getFileSystem();
-      CopyMapper copyMapper = new CopyMapper();
-      StubContext stubContext = new StubContext(getConfiguration(), null, 0);
-      Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
-              = stubContext.getContext();
+    // do the distcp again with -update and -append option
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
+        stubContext.getContext();
+    // Enable append 
+    context.getConfiguration().setBoolean(
+        DistCpOptionSwitch.APPEND.getConfigLabel(), true);
+    copyMapper.setup(context);
+    for (Path path: pathList) {
+      copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+              new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
+                  path)), context);
+    }
 
 
-      Configuration configuration = context.getConfiguration();
-      EnumSet<DistCpOptions.FileAttribute> fileAttributes
-              = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
-      if (preserveChecksum) {
-        fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
-      }
-      configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
-              DistCpUtils.packAttributes(fileAttributes));
+    verifyCopy(fs, false);
+    // verify that we only copied new appended data
+    Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
+        .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+        .getValue());
+    Assert.assertEquals(pathList.size(), stubContext.getReporter().
+        getCounter(CopyMapper.Counter.COPY).getValue());
+  }
 
 
-      copyMapper.setup(context);
+  private void testCopy(boolean preserveChecksum) throws Exception {
+    deleteState();
+    if (preserveChecksum) {
+      createSourceDataWithDifferentChecksumType();
+    } else {
+      createSourceData();
+    }
 
 
-      for (Path path: pathList) {
-        copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
-                new CopyListingFileStatus(fs.getFileStatus(path)), context);
-      }
+    FileSystem fs = cluster.getFileSystem();
+    CopyMapper copyMapper = new CopyMapper();
+    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
+            = stubContext.getContext();
+
+    Configuration configuration = context.getConfiguration();
+    EnumSet<DistCpOptions.FileAttribute> fileAttributes
+            = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
+    if (preserveChecksum) {
+      fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+    }
+    configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+            DistCpUtils.packAttributes(fileAttributes));
 
 
-      // Check that the maps worked.
-      for (Path path : pathList) {
-        final Path targetPath = new Path(path.toString()
-                .replaceAll(SOURCE_PATH, TARGET_PATH));
-        Assert.assertTrue(fs.exists(targetPath));
-        Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
-        FileStatus sourceStatus = fs.getFileStatus(path);
-        FileStatus targetStatus = fs.getFileStatus(targetPath);
-        Assert.assertEquals(sourceStatus.getReplication(),
-            targetStatus.getReplication());
-        if (preserveChecksum) {
-          Assert.assertEquals(sourceStatus.getBlockSize(),
-              targetStatus.getBlockSize());
-        }
-        Assert.assertTrue(!fs.isFile(targetPath)
-            || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
-      }
+    copyMapper.setup(context);
 
 
-      Assert.assertEquals(pathList.size(),
-              stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
-      if (!preserveChecksum) {
-        Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
-            .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
-            .getValue());
-      } else {
-        Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
-            .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
-            .getValue());
-      }
+    for (Path path: pathList) {
+      copyMapper.map(
+          new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
+          new CopyListingFileStatus(fs.getFileStatus(path)), context);
+    }
 
 
-      testCopyingExistingFiles(fs, copyMapper, context);
-      for (Text value : stubContext.getWriter().values()) {
-        Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
-      }
+    // Check that the maps worked.
+    verifyCopy(fs, preserveChecksum);
+    Assert.assertEquals(pathList.size(), stubContext.getReporter()
+        .getCounter(CopyMapper.Counter.COPY).getValue());
+    if (!preserveChecksum) {
+      Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
+          .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+          .getValue());
+    } else {
+      Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
+          .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+          .getValue());
     }
     }
-    catch (Exception e) {
-      LOG.error("Unexpected exception: ", e);
-      Assert.assertTrue(false);
+
+    testCopyingExistingFiles(fs, copyMapper, context);
+    for (Text value : stubContext.getWriter().values()) {
+      Assert.assertTrue(value.toString() + " is not skipped", value
+          .toString().startsWith("SKIP:"));
+    }
+  }
+
+  private void verifyCopy(FileSystem fs, boolean preserveChecksum)
+      throws Exception {
+    for (Path path : pathList) {
+      final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
+          TARGET_PATH));
+      Assert.assertTrue(fs.exists(targetPath));
+      Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
+      FileStatus sourceStatus = fs.getFileStatus(path);
+      FileStatus targetStatus = fs.getFileStatus(targetPath);
+      Assert.assertEquals(sourceStatus.getReplication(),
+          targetStatus.getReplication());
+      if (preserveChecksum) {
+        Assert.assertEquals(sourceStatus.getBlockSize(),
+            targetStatus.getBlockSize());
+      }
+      Assert.assertTrue(!fs.isFile(targetPath)
+          || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
     }
     }
   }
   }
 
 

+ 3 - 2
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
 import org.junit.Test;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 import static org.mockito.Mockito.*;
@@ -48,8 +49,8 @@ public class TestRetriableFileCopyCommand {
     
     
     Exception actualEx = null;
     Exception actualEx = null;
     try {
     try {
-      new RetriableFileCopyCommand("testFailOnCloseError")
-        .copyBytes(stat, out, 512, context);
+      new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE)
+        .copyBytes(stat, 0, out, 512, context);
     } catch (Exception e) {
     } catch (Exception e) {
       actualEx = e;
       actualEx = e;
     }
     }