Selaa lähdekoodia

Merging changes r1087114:r1087900 from trunk to federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1089945 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 vuotta sitten
vanhempi
commit
adaa771cc8

+ 7 - 0
CHANGES.txt

@@ -251,6 +251,10 @@ Trunk (unreleased changes)
     HDFS-1791. Federation: Add command to delete block pool directories 
     from a datanode. (jitendra)
 
+    HDFS-1761. Add a new DataTransferProtocol operation, Op.TRANSFER_BLOCK,
+    for transferring RBW/Finalized with acknowledgement and without using RPC.
+    (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -387,6 +391,9 @@ Trunk (unreleased changes)
     HDFS-1611. Fix up some log messages in DFSClient and MBean registration
     (Uma Maheswara Rao G via todd)
 
+    HDFS-1543. Reduce dev. cycle time by moving system testing artifacts from
+    default build and push to maven for HDFS (Luke Lu via cos)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES

+ 4 - 1
build.xml

@@ -1550,7 +1550,7 @@
          uri="urn:maven-artifact-ant" classpathref="mvn-ant-task.classpath"/>
   </target>   
 
-  <target name="mvn-install" depends="mvn-taskdef,jar,jar-test,set-version,-mvn-system-install"
+  <target name="mvn-install" depends="mvn-taskdef,jar,jar-test,set-version"
      description="To install hadoop hdfs and test jars to local filesystem's m2 cache">
      <artifact:pom file="${hadoop-hdfs.pom}" id="hadoop.hdfs"/>
      <artifact:pom file="${hadoop-hdfs-test.pom}" id="hadoop.hdfs.test"/>
@@ -1563,6 +1563,9 @@
         <attach file="${hadoop-hdfs-test-sources.jar}" classifier="sources" />
      </artifact:install>
    </target>
+
+   <target name="mvn-si-install" depends="mvn-install,-mvn-system-install"
+           description="Install system integration tests jars as well"/>
   
    <target name="mvn-deploy" depends="mvn-taskdef, jar, jar-test,
      jar-system, jar-test-system, set-version, signanddeploy, simpledeploy"

+ 71 - 17
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -46,7 +46,7 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 21:
+   * Version 22:
    *    Changed the protocol methods to use ExtendedBlock instead
    *    of Block.
    */
@@ -59,7 +59,8 @@ public interface DataTransferProtocol {
     READ_METADATA((byte)82),
     REPLACE_BLOCK((byte)83),
     COPY_BLOCK((byte)84),
-    BLOCK_CHECKSUM((byte)85);
+    BLOCK_CHECKSUM((byte)85),
+    TRANSFER_BLOCK((byte)86);
 
     /** The code for this operation. */
     public final byte code;
@@ -145,8 +146,10 @@ public interface DataTransferProtocol {
     PIPELINE_CLOSE_RECOVERY,
     // pipeline set up for block creation
     PIPELINE_SETUP_CREATE,
-    // similar to replication but transferring rbw instead of finalized
-    TRANSFER_RBW;
+    // transfer RBW for adding datanodes
+    TRANSFER_RBW,
+    // transfer Finalized for adding datanodes
+    TRANSFER_FINALIZED;
     
     final static private byte RECOVERY_BIT = (byte)1;
     
@@ -265,14 +268,23 @@ public interface DataTransferProtocol {
       if (src != null) {
         src.write(out);
       }
-      out.writeInt(targets.length - 1);
-      for (int i = 1; i < targets.length; i++) {
-        targets[i].write(out);
-      }
+      write(out, 1, targets);
+      blockToken.write(out);
+    }
+
+    /** Send {@link Op#TRANSFER_BLOCK} */
+    public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
+        String client, DatanodeInfo[] targets,
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
+      op(out, Op.TRANSFER_BLOCK);
 
+      blk.writeId(out);
+      Text.writeString(out, client);
+      write(out, 0, targets);
       blockToken.write(out);
+      out.flush();
     }
-    
+
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
         ExtendedBlock blk, String storageId, DatanodeInfo src,
@@ -307,6 +319,16 @@ public interface DataTransferProtocol {
       blockToken.write(out);
       out.flush();
     }
+
+    /** Write an array of {@link DatanodeInfo} */
+    private static void write(final DataOutputStream out,
+        final int start, 
+        final DatanodeInfo[] datanodeinfos) throws IOException {
+      out.writeInt(datanodeinfos.length - start);
+      for (int i = start; i < datanodeinfos.length; i++) {
+        datanodeinfos[i].write(out);
+      }
+    }
   }
 
   /** Receiver */
@@ -341,6 +363,9 @@ public interface DataTransferProtocol {
       case BLOCK_CHECKSUM:
         opBlockChecksum(in);
         break;
+      case TRANSFER_BLOCK:
+        opTransferBlock(in);
+        break;
       default:
         throw new IOException("Unknown op " + op + " in data stream");
       }
@@ -378,14 +403,7 @@ public interface DataTransferProtocol {
       final String client = Text.readString(in); // working on behalf of this client
       final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
 
-      final int nTargets = in.readInt();
-      if (nTargets < 0) {
-        throw new IOException("Mislabelled incoming datastream.");
-      }
-      final DatanodeInfo targets[] = new DatanodeInfo[nTargets];
-      for (int i = 0; i < targets.length; i++) {
-        targets[i] = DatanodeInfo.read(in);
-      }
+      final DatanodeInfo targets[] = readDatanodeInfos(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
       opWriteBlock(in, blk, pipelineSize, stage,
@@ -402,6 +420,27 @@ public interface DataTransferProtocol {
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
+    /** Receive {@link Op#TRANSFER_BLOCK} */
+    private void opTransferBlock(DataInputStream in) throws IOException {
+      final ExtendedBlock blk = new ExtendedBlock();
+      blk.readId(in);
+      final String client = Text.readString(in);
+      final DatanodeInfo targets[] = readDatanodeInfos(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
+
+      opTransferBlock(in, blk, client, targets, blockToken);
+    }
+
+    /**
+     * Abstract {@link Op#TRANSFER_BLOCK} method.
+     * For {@link BlockConstructionStage#TRANSFER_RBW}
+     * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+     */
+    protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
+        String client, DatanodeInfo[] targets,
+        Token<BlockTokenIdentifier> blockToken)
+        throws IOException;
+
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
       final ExtendedBlock blk = new ExtendedBlock();
@@ -455,6 +494,21 @@ public interface DataTransferProtocol {
         ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
+    /** Read an array of {@link DatanodeInfo} */
+    private static DatanodeInfo[] readDatanodeInfos(final DataInputStream in
+        ) throws IOException {
+      final int n = in.readInt();
+      if (n < 0) {
+        throw new IOException("Mislabelled incoming datastream: "
+            + n + " = n < 0");
+      }
+      final DatanodeInfo[] datanodeinfos= new DatanodeInfo[n];
+      for (int i = 0; i < datanodeinfos.length; i++) {
+        datanodeinfos[i] = DatanodeInfo.read(in);
+      }
+      return datanodeinfos;
+    }
+
     /** Read an AccessToken */
     static private Token<BlockTokenIdentifier> readBlockToken(DataInputStream in
         ) throws IOException {

+ 34 - 14
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -85,14 +85,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
   private final boolean isClient; 
-  private final boolean isDatanode; 
+  private final boolean isDatanode;
 
   /** the block to receive */
   private final ExtendedBlock block; 
   /** the replica to write */
   private final ReplicaInPipelineInterface replicaInfo;
   /** pipeline stage */
-  private final BlockConstructionStage initialStage;
+  private final BlockConstructionStage stage;
+  private final boolean isTransfer;
 
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
@@ -114,8 +115,19 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
 
       //for datanode, we have
       //1: clientName.length() == 0, and
-      //2: stage == null, PIPELINE_SETUP_CREATE or TRANSFER_RBW
-      this.initialStage = stage;
+      //2: stage == null or PIPELINE_SETUP_CREATE
+      this.stage = stage;
+      this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
+          || stage == BlockConstructionStage.TRANSFER_FINALIZED;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getClass().getSimpleName() + ": " + block
+            + "\n  isClient  =" + isClient + ", clientname=" + clientname
+            + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+            + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
+            );
+      }
+
       //
       // Open local disk out
       //
@@ -147,6 +159,11 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
           }
           block.setGenerationStamp(newGs);
           break;
+        case TRANSFER_RBW:
+        case TRANSFER_FINALIZED:
+          // this is a transfer destination
+          replicaInfo = datanode.data.createTemporary(block);
+          break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);
         }
@@ -156,7 +173,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
       
-      final boolean isCreate = isDatanode 
+      final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
       streams = replicaInfo.createStreams(isCreate,
           this.bytesPerChecksum, this.checksumSize);
@@ -643,7 +660,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       throttler = throttlerArg;
 
     try {
-      if (isClient) {
+      if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 
             new PacketResponder(this, block, mirrIn, replyOut, 
                                 numTargets, Thread.currentThread()));
@@ -663,16 +680,20 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
         responderClosed = true;
       }
 
-      // if this write is for a replication request (and not
-      // from a client), then finalize block. For client-writes, 
-      // the block is finalized in the PacketResponder.
-      if (isDatanode) {
+      // If this write is for a replication or transfer-RBW/Finalized,
+      // then finalize block or convert temporary to RBW.
+      // For client-writes, the block is finalized in the PacketResponder.
+      if (isDatanode || isTransfer) {
         // close the block/crc files
         close();
+        block.setNumBytes(replicaInfo.getNumBytes());
 
-        if (initialStage != BlockConstructionStage.TRANSFER_RBW) {
+        if (stage == BlockConstructionStage.TRANSFER_RBW) {
+          // for TRANSFER_RBW, convert temporary to RBW
+          datanode.data.convertTemporaryToRbw(block);
+        } else {
+          // for isDatnode or TRANSFER_FINALIZED
           // Finalize the block. Does this fsync()?
-          block.setNumBytes(replicaInfo.getNumBytes());
           datanode.data.finalizeBlock(block);
         }
         datanode.myMetrics.blocksWritten.inc();
@@ -705,8 +726,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
    * if this write is for a replication request (and not from a client)
    */
   private void cleanupBlock() throws IOException {
-    if (isDatanode
-        && initialStage != BlockConstructionStage.TRANSFER_RBW) {
+    if (isDatanode) {
       datanode.data.unfinalizeBlock(block);
     }
   }

+ 101 - 68
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -19,9 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -64,7 +67,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -72,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -79,19 +82,18 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -100,6 +102,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -109,7 +112,6 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -125,13 +127,13 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
 /**********************************************************
@@ -298,7 +300,7 @@ public class DataNode extends Configured
     void refreshNamenodes(Configuration conf)
         throws IOException, InterruptedException {
       LOG.info("Refresh request received for nameservices: "
-          + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES));
+          + conf.get(DFS_FEDERATION_NAMESERVICES));
       List<InetSocketAddress> newAddresses = 
         DFSUtil.getNNServiceRpcAddresses(conf);
       List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
@@ -423,8 +425,8 @@ public class DataNode extends Configured
       throws UnknownHostException {
     String name = null;
     // use configured nameserver & interface to get local hostname
-    if (config.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
-      name = config.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);
+    if (config.get(DFS_DATANODE_HOST_NAME_KEY) != null) {
+      name = config.get(DFS_DATANODE_HOST_NAME_KEY);
     }
     if (name == null) {
       name = DNS.getDefaultHost(config.get("dfs.datanode.dns.interface",
@@ -434,31 +436,31 @@ public class DataNode extends Configured
   }
 
   private void initConfig(Configuration conf) {
-    this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+    this.socketTimeout =  conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
                                       HdfsConstants.READ_TIMEOUT);
-    this.socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+    this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
                                           HdfsConstants.WRITE_TIMEOUT);
     /* Based on results on different platforms, we might need set the default 
      * to false on some of them. */
     this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
                                              true);
-    this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
-                                       DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
+                                       DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
 
     this.blockReportInterval =
-      conf.getLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
-    this.initialBlockReportDelay = conf.getLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
+      conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
+    this.initialBlockReportDelay = conf.getLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
                                             BLOCKREPORT_INITIAL_DELAY)* 1000L; 
     if (this.initialBlockReportDelay >= blockReportInterval) {
       this.initialBlockReportDelay = 0;
       LOG.info("dfs.blockreport.initialDelay is greater than " +
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
-    this.heartBeatInterval = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
+    this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
 
     // do we need to sync block file contents to disk when blockfile is closed?
-    this.syncOnClose = conf.getBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, 
-                                       DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT);
+    this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
+                                       DFS_DATANODE_SYNCONCLOSE_DEFAULT);
   }
   
   private void startInfoServer(Configuration conf) throws IOException {
@@ -468,16 +470,16 @@ public class DataNode extends Configured
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = (secureResources == null) 
        ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, 
-           conf, new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")))
+           conf, new AccessControlList(conf.get(DFS_ADMIN, " ")))
        : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
-           conf, new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")),
+           conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
            secureResources.getListener());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
     }
     if (conf.getBoolean("dfs.https.enable", false)) {
-      boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
-                                               DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
+      boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+                                               DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
           "dfs.datanode.https.address", infoHost + ":" + 0));
       Configuration sslConf = new HdfsConfiguration(false);
@@ -1399,7 +1401,7 @@ public class DataNode extends Configured
       storage.createStorageID();
       // it would have been better to pass storage as a parameter to
       // constructor below - need to augment ReflectionUtils used below.
-      conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, getStorageId());
+      conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
       try {
         data = (FSDatasetInterface) ReflectionUtils.newInstance(
             Class.forName(
@@ -1786,7 +1788,7 @@ public class DataNode extends Configured
       }
 
       new Daemon(new DataTransfer(xferTargets, block,
-          BlockConstructionStage.PIPELINE_SETUP_CREATE)).start();
+          BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
     }
   }
 
@@ -1894,18 +1896,26 @@ public class DataNode extends Configured
     final ExtendedBlock b;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
+    final String clientname;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage
-        ) throws IOException {
+    DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+        final String clientname) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getClass().getSimpleName() + ": " + b
+            + ", stage=" + stage
+            + ", clientname=" + clientname
+            + ", targests=" + Arrays.asList(targets));
+      }
       this.targets = targets;
       this.b = b;
       this.stage = stage;
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
       bpReg = bpos.bpRegistration;
+      this.clientname = clientname;
     }
 
     /**
@@ -1915,7 +1925,9 @@ public class DataNode extends Configured
       xmitsInProgress.getAndIncrement();
       Socket sock = null;
       DataOutputStream out = null;
+      DataInputStream in = null;
       BlockSender blockSender = null;
+      final boolean isClient = clientname.length() > 0;
       
       try {
         InetSocketAddress curTarget = 
@@ -1929,7 +1941,6 @@ public class DataNode extends Configured
         OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
         out = new DataOutputStream(new BufferedOutputStream(baseStream, 
                                                             SMALL_BUFFER_SIZE));
-
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, false, DataNode.this);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
@@ -1944,14 +1955,33 @@ public class DataNode extends Configured
         }
 
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b, 0, stage, 0, 0, 0, "", srcNode, targets, accessToken);
+            b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
 
         // no response necessary
-        LOG.info(bpReg + ":Transmitted block " + b + " to " + curTarget);
-
+        LOG.info(getClass().getSimpleName() + ": Transmitted " + b
+            + " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
+
+        // read ack
+        if (isClient) {
+          in = new DataInputStream(NetUtils.getInputStream(sock));
+          final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+          }
+          if (s != SUCCESS) {
+            if (s == ERROR_ACCESS_TOKEN) {
+              throw new InvalidBlockTokenException(
+                  "Got access token error for connect ack, targets="
+                   + Arrays.asList(targets));
+            } else {
+              throw new IOException("Bad connect ack, targets="
+                  + Arrays.asList(targets));
+            }
+          }
+        }
       } catch (IOException ie) {
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
@@ -1962,6 +1992,7 @@ public class DataNode extends Configured
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
         IOUtils.closeStream(out);
+        IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
       }
     }
@@ -2051,14 +2082,14 @@ public class DataNode extends Configured
     dnThreadName = "DataNode: [" +
                     StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
    UserGroupInformation.setConfiguration(conf);
-    SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
-        DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
+    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
+        DFS_DATANODE_USER_NAME_KEY);
     return makeInstance(dataDirs, conf, resources);
   }
 
   static Collection<URI> getStorageDirs(Configuration conf) {
     Collection<String> dirNames =
-      conf.getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+      conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
     return Util.stringCollectionAsURIs(dirNames);
   }
 
@@ -2110,8 +2141,8 @@ public class DataNode extends Configured
       SecureResources resources) throws IOException {
     LocalFileSystem localFS = FileSystem.getLocal(conf);
     FsPermission permission = new FsPermission(
-        conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
-                 DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
+        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
+                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
     ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
 
     assert dirs.size() > 0 : "number of data directories should be > 0";
@@ -2136,13 +2167,13 @@ public class DataNode extends Configured
         dirs.add(data);
       } catch (IOException e) {
         LOG.warn("Invalid directory in: "
-                 + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": ", e);
+                 + DFS_DATANODE_DATA_DIR_KEY + ": ", e);
         invalidDirs.append("\"").append(data.getCanonicalPath()).append("\" ");
       }
     }
     if (dirs.size() == 0)
       throw new IOException("All directories in "
-          + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
           + invalidDirs);
     return dirs;
   }
@@ -2536,48 +2567,50 @@ public class DataNode extends Configured
   }
 
   /**
-   * Transfer a block to the datanode targets.
-   * @return rbw's visible length
+   * Transfer a replica to the datanode targets.
+   * @param b the block to transfer.
+   *          The corresponding replica must be an RBW or a Finalized.
+   *          Its GS and numBytes will be set to
+   *          the stored GS and the visible length. 
+   * @param targets
+   * @param client
+   * @return whether the replica is an RBW
    */
-  long transferBlockForPipelineRecovery(final ExtendedBlock b,
-      final DatanodeInfo[] targets) throws IOException {
+  boolean transferReplicaForPipelineRecovery(final ExtendedBlock b,
+      final DatanodeInfo[] targets, final String client) throws IOException {
     checkWriteAccess(b);
-    final Block stored;
-    final boolean isRbw;
+
+    final long storedGS;
     final long visible;
+    final BlockConstructionStage stage;
 
     //get replica information
     synchronized(data) {
-      stored = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
-      if (stored.getGenerationStamp() < b.getGenerationStamp()) {
+      if (data.isValidRbw(b)) {
+        stage = BlockConstructionStage.TRANSFER_RBW;
+      } else if (data.isValidBlock(b)) {
+        stage = BlockConstructionStage.TRANSFER_FINALIZED;
+      } else {
+        throw new IOException(b + " is not a RBW or a Finalized");
+      }
+
+      storedGS = data.getStoredBlock(b.getBlockPoolId(),
+          b.getBlockId()).getGenerationStamp();
+      if (storedGS < b.getGenerationStamp()) {
         throw new IOException(
-            "stored.getGenerationStamp() < b.getGenerationStamp(), stored="
-            + stored + ", b=" + b);        
+            storedGS + " = storedGS < b.getGenerationStamp(), b=" + b);        
       }
-      isRbw = data.isValidRbw(b);
       visible = data.getReplicaVisibleLength(b);
     }
 
+    //set storedGS and visible length
+    b.setGenerationStamp(storedGS);
+    b.setNumBytes(visible);
+
     if (targets.length > 0) {
-      if (isRbw) {
-        //transfer rbw
-        new DataTransfer(targets, b, BlockConstructionStage.TRANSFER_RBW).run();
-      } else {
-        //transfer finalized replica
-        transferBlock(new ExtendedBlock(b.getBlockPoolId(), stored), targets);
-      }
+      new DataTransfer(targets, b, stage, client).run();
     }
-    //TODO: should return: visible + storedGS + isRbw
-    return visible;
-  }
-
-  /**
-   * Covert an existing temporary replica to a rbw. 
-   * @param temporary specifies id, gs and visible bytes.
-   * @throws IOException
-   */
-  void convertTemporaryToRbw(final ExtendedBlock temporary) throws IOException {
-    data.convertTemporaryToRbw(temporary);
+    return stage == BlockConstructionStage.TRANSFER_RBW;
   }
 
   // Determine a Datanode's streaming address

+ 49 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -32,6 +32,7 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
@@ -220,8 +221,25 @@ class DataXceiver extends DataTransferProtocol.Receiver
     updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isClient = !isDatanode;
+    final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
+        || stage == BlockConstructionStage.TRANSFER_FINALIZED;
+
+    // check single target for transfer-RBW/Finalized 
+    if (isTransfer && targets.length > 0) {
+      throw new IOException(stage + " does not support multiple targets "
+          + Arrays.asList(targets));
+    }
 
     if (LOG.isDebugEnabled()) {
+      LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
+      		+ "\n  block  =" + block + ", newGs=" + newGs
+      		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+          + "\n  targets=" + Arrays.asList(targets)
+          + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
+          );
+      LOG.debug("isDatanode=" + isDatanode
+          + ", isClient=" + isClient
+          + ", isTransfer=" + isTransfer);
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
                 " tcp no delay " + s.getTcpNoDelay());
     }
@@ -331,8 +349,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
         }
       }
 
-      // send connect ack back to source (only for clients)
-      if (isClient) {
+      // send connect-ack to source for clients and not transfer-RBW/Finalized
+      if (isClient && !isTransfer) {
         if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
@@ -348,6 +366,14 @@ class DataXceiver extends DataTransferProtocol.Receiver
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
             mirrorAddr, null, targets.length);
+
+        // send close-ack for transfer-RBW/Finalized 
+        if (isTransfer) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("TRANSFER: send close-ack");
+          }
+          SUCCESS.write(replyOut);
+        }
       }
 
       // update its generation stamp
@@ -360,8 +386,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
       // if this write is for a replication request or recovering
       // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if ((isDatanode  && stage != BlockConstructionStage.TRANSFER_RBW)
-          ||
+      if (isDatanode ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
@@ -389,6 +414,26 @@ class DataXceiver extends DataTransferProtocol.Receiver
                   datanode.myMetrics.writesFromRemoteClient);
   }
 
+  @Override
+  protected void opTransferBlock(final DataInputStream in,
+      final ExtendedBlock blk, final String client,
+      final DatanodeInfo[] targets,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    final DataOutputStream out = new DataOutputStream(
+        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+    checkAccess(out, blk, blockToken,
+        DataTransferProtocol.Op.TRANSFER_BLOCK,
+        BlockTokenSecretManager.AccessMode.COPY);
+
+    updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+    try {
+      datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+      SUCCESS.write(out);
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+
   /**
    * Get block checksum (MD5 of CRC32).
    */

+ 33 - 22
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -1695,40 +1695,51 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     final long blockId = b.getBlockId();
     final long expectedGs = b.getGenerationStamp();
     final long visible = b.getNumBytes();
-    DataNode.LOG.info("Covert the temporary replica " + b
-        + " to RBW, visible length is " + visible);
+    DataNode.LOG.info("Convert replica " + b
+        + " from Temporary to RBW, visible length=" + visible);
 
-    // get replica
-    final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
-    if (r == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }
-    // check the replica's state
-    if (r.getState() != ReplicaState.TEMPORARY) {
-      throw new ReplicaNotFoundException(
-          "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+    final ReplicaInPipeline temp;
+    {
+      // get replica
+      final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
+      if (r == null) {
+        throw new ReplicaNotFoundException(
+            ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+      }
+      // check the replica's state
+      if (r.getState() != ReplicaState.TEMPORARY) {
+        throw new ReplicaAlreadyExistsException(
+            "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+      }
+      temp = (ReplicaInPipeline)r;
     }
     // check generation stamp
-    if (r.getGenerationStamp() != expectedGs) {
-      throw new ReplicaNotFoundException(
-          "r.getGenerationStamp() != expectedGs = " + expectedGs + ", r=" + r);
+    if (temp.getGenerationStamp() != expectedGs) {
+      throw new ReplicaAlreadyExistsException(
+          "temp.getGenerationStamp() != expectedGs = " + expectedGs
+          + ", temp=" + temp);
     }
+
+    // TODO: check writer?
+    // set writer to the current thread
+    // temp.setWriter(Thread.currentThread());
+
     // check length
-    final long numBytes = r.getNumBytes();
+    final long numBytes = temp.getNumBytes();
     if (numBytes < visible) {
-      throw new ReplicaNotFoundException(numBytes + " = numBytes < visible = "
-          + visible + ", r=" + r);
+      throw new IOException(numBytes + " = numBytes < visible = "
+          + visible + ", temp=" + temp);
     }
     // check volume
-    final FSVolume v = r.getVolume();
+    final FSVolume v = temp.getVolume();
     if (v == null) {
-      throw new IOException("r.getVolume() = null, temp="  + r);
+      throw new IOException("r.getVolume() = null, temp="  + temp);
     }
     
     // move block files to the rbw directory
-    final File dest = moveBlockFiles(b.getLocalBlock(), r.getBlockFile(),
-        v.getRbwDir(b.getBlockPoolId()));
+    BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
+    final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), 
+        bpslice.getRbwDir());
     // create RBW
     final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
         blockId, numBytes, expectedGs,

+ 32 - 4
src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.DataInputStream;
+import java.net.Socket;
 import java.net.URL;
 import java.net.URLConnection;
 import java.security.PrivilegedExceptionAction;
@@ -40,16 +43,22 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -389,4 +398,23 @@ public class DFSTestUtil {
     in.readFully(content);
     return content;
   }
+
+  /** For {@link TestTransferRbw} */
+  public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b, 
+      final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
+    Assert.assertEquals(2, datanodes.length);
+    final Socket s = DFSOutputStream.createSocketForPipeline(datanodes, dfsClient);
+    final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        NetUtils.getOutputStream(s, writeTimeout),
+        DataNode.SMALL_BUFFER_SIZE));
+    final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
+
+    // send the request
+    DataTransferProtocol.Sender.opTransferBlock(out, b, dfsClient.clientName,
+        new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
+    out.flush();
+
+    return DataTransferProtocol.Status.read(in);
+  }
 }

+ 15 - 13
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java

@@ -22,22 +22,30 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
 /** Test transferring RBW between datanodes */
 public class TestTransferRbw {
   private static final Log LOG = LogFactory.getLog(TestTransferRbw.class);
+  
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+  }
 
   private static final Random RAN = new Random();
   private static final short REPLICATION = (short)1;
@@ -87,7 +95,6 @@ public class TestTransferRbw {
       final ReplicaBeingWritten oldrbw;
       final DataNode newnode;
       final DatanodeInfo newnodeinfo;
-      final long visible;
       final String bpid = cluster.getNamesystem().getBlockPoolId();
       {
         final DataNode oldnode = cluster.getDataNodes().get(0);
@@ -98,6 +105,7 @@ public class TestTransferRbw {
         cluster.startDataNodes(conf, 1, true, null, null);
         newnode = cluster.getDataNodes().get(REPLICATION);
         
+        final DatanodeInfo oldnodeinfo;
         {
           final DatanodeInfo[] datatnodeinfos = cluster.getNameNode(
               ).getDatanodeReport(DatanodeReportType.LIVE);
@@ -107,23 +115,17 @@ public class TestTransferRbw {
               i < datatnodeinfos.length && !datatnodeinfos[i].equals(dnReg); i++);
           Assert.assertTrue(i < datatnodeinfos.length);
           newnodeinfo = datatnodeinfos[i];
+          oldnodeinfo = datatnodeinfos[1 - i];
         }
         
         //transfer RBW
-        visible = oldnode.transferBlockForPipelineRecovery(new ExtendedBlock(
-            bpid, oldrbw), new DatanodeInfo[] { newnodeinfo });
+        final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(), oldrbw.getBytesAcked(),
+            oldrbw.getGenerationStamp());
+        final DataTransferProtocol.Status s = DFSTestUtil.transferRbw(
+            b, fs.getClient(), oldnodeinfo, newnodeinfo);
+        Assert.assertEquals(DataTransferProtocol.Status.SUCCESS, s);
       }
 
-      //check temporary
-      final ReplicaInPipeline temp = getReplica(newnode, bpid,
-          ReplicaState.TEMPORARY);
-      LOG.info("temp = " + temp);
-      Assert.assertEquals(oldrbw.getBlockId(), temp.getBlockId());
-      Assert.assertEquals(oldrbw.getGenerationStamp(), temp.getGenerationStamp());
-      final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(),
-          visible, oldrbw.getGenerationStamp());
-      //convert temporary to rbw
-      newnode.convertTemporaryToRbw(b);
       //check new rbw
       final ReplicaBeingWritten newrbw = getRbw(newnode, bpid);
       LOG.info("newrbw = " + newrbw);

+ 21 - 0
svn-commit.tmp

@@ -0,0 +1,21 @@
+Merging changes r1087114:r1087900 from trunk to federation
+--This line, and those below, will be ignored--
+
+_M   .
+M    CHANGES.txt
+_M   src/test/hdfs
+M    src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
+M    src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+_M   src/contrib/hdfsproxy
+_M   src/java
+M    src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+M    src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+M    src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+M    src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+M    src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+_M   src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+_M   src/webapps/datanode
+_M   src/webapps/hdfs
+_M   src/webapps/secondary
+_M   src/c++/libhdfs
+MM   build.xml