Browse Source

Merging change r1081580 from trunk to federation.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1085174 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 years ago
parent
commit
a7c610b5fc

+ 2 - 0
CHANGES.txt

@@ -246,6 +246,8 @@ Trunk (unreleased changes)
     HDFS-1755. Federation: The BPOfferService must always connect to namenode as 
     the login user. (jitendra)
 
+    HDFS-1675. Support transferring RBW between datanodes. (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

+ 5 - 3
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java

@@ -46,11 +46,11 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 20:
+   * Version 21:
    *    Changed the protocol methods to use ExtendedBlock instead
    *    of Block.
    */
-  public static final int DATA_TRANSFER_VERSION = 19;
+  public static final int DATA_TRANSFER_VERSION = 21;
 
   /** Operation */
   public enum Op {
@@ -144,7 +144,9 @@ public interface DataTransferProtocol {
     // Recover a failed PIPELINE_CLOSE
     PIPELINE_CLOSE_RECOVERY,
     // pipeline set up for block creation
-    PIPELINE_SETUP_CREATE;
+    PIPELINE_SETUP_CREATE,
+    // similar to replication but transferring rbw instead of finalized
+    TRANSFER_RBW;
     
     final static private byte RECOVERY_BIT = (byte)1;
     

+ 13 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -82,6 +82,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private final DataNode datanode;
+  private final BlockConstructionStage initialStage;
   final private ReplicaInPipelineInterface replicaInfo;
   volatile private boolean mirrorError;
 
@@ -98,6 +99,11 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       this.clientName = clientName;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
+      
+      //for datanode, we have
+      //1: clientName.length() == 0, and
+      //2: stage == null, PIPELINE_SETUP_CREATE or TRANSFER_RBW
+      this.initialStage = stage;
       //
       // Open local disk out
       //
@@ -652,9 +658,11 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
         // close the block/crc files
         close();
 
-        // Finalize the block. Does this fsync()?
-        block.setNumBytes(replicaInfo.getNumBytes());
-        datanode.data.finalizeBlock(block);
+        if (initialStage != BlockConstructionStage.TRANSFER_RBW) {
+          // Finalize the block. Does this fsync()?
+          block.setNumBytes(replicaInfo.getNumBytes());
+          datanode.data.finalizeBlock(block);
+        }
         datanode.myMetrics.blocksWritten.inc();
       }
 
@@ -685,7 +693,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
    * if this write is for a replication request (and not from a client)
    */
   private void cleanupBlock() throws IOException {
-    if (clientName.length() == 0) { // not client write
+    if (clientName.length() == 0
+        && initialStage != BlockConstructionStage.TRANSFER_RBW) {
       datanode.data.unfinalizeBlock(block);
     }
   }

+ 65 - 17
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1780,7 +1780,8 @@ public class DataNode extends Configured
                  block + " to " + xfersBuilder);                       
       }
 
-      new Daemon(new DataTransfer(xferTargets, block, this)).start();
+      new Daemon(new DataTransfer(xferTargets, block,
+          BlockConstructionStage.PIPELINE_SETUP_CREATE)).start();
     }
   }
 
@@ -1881,21 +1882,21 @@ public class DataNode extends Configured
    * Used for transferring a block of data.  This class
    * sends a piece of data to another DataNode.
    */
-  class DataTransfer implements Runnable {
-    DatanodeInfo targets[];
-    ExtendedBlock b;
-    DataNode datanode;
+  private class DataTransfer implements Runnable {
+    final DatanodeInfo[] targets;
+    final ExtendedBlock b;
+    final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    public DataTransfer(DatanodeInfo targets[], ExtendedBlock b,
-        DataNode datanode) throws IOException {
+    DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage
+        ) throws IOException {
       this.targets = targets;
       this.b = b;
-      this.datanode = datanode;
+      this.stage = stage;
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
       bpReg = bpos.bpRegistration;
     }
@@ -1923,7 +1924,7 @@ public class DataNode extends Configured
                                                             SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, false, datanode);
+            false, false, false, DataNode.this);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //
@@ -1934,9 +1935,9 @@ public class DataNode extends Configured
           accessToken = blockPoolTokenSecretManager.generateToken(b, 
               EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
+
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
-            srcNode, targets, accessToken);
+            b, 0, stage, 0, 0, 0, "", srcNode, targets, accessToken);
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
@@ -1948,7 +1949,7 @@ public class DataNode extends Configured
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
         // check if there are any disk problem
-        datanode.checkDiskError();
+        checkDiskError();
         
       } finally {
         xmitsInProgress.getAndDecrement();
@@ -2504,13 +2505,17 @@ public class DataNode extends Configured
   /** {@inheritDoc} */
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
+    checkWriteAccess(block);
+    return data.getReplicaVisibleLength(block);
+  }
+
+  private void checkWriteAccess(final ExtendedBlock block) throws IOException {
     if (isBlockTokenEnabled) {
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();
       if (tokenIds.size() != 1) {
-        throw new IOException("Can't continue with getReplicaVisibleLength() "
-            + "authorization since none or more than one BlockTokenIdentifier "
-            + "is found.");
+        throw new IOException("Can't continue since none or more than one "
+            + "BlockTokenIdentifier is found.");
       }
       for (TokenIdentifier tokenId : tokenIds) {
         BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
@@ -2521,10 +2526,53 @@ public class DataNode extends Configured
             BlockTokenSecretManager.AccessMode.WRITE);
       }
     }
+  }
 
-    return data.getReplicaVisibleLength(block);
+  /**
+   * Transfer a block to the datanode targets.
+   * @return rbw's visible length
+   */
+  long transferBlockForPipelineRecovery(final ExtendedBlock b,
+      final DatanodeInfo[] targets) throws IOException {
+    checkWriteAccess(b);
+    final Block stored;
+    final boolean isRbw;
+    final long visible;
+
+    //get replica information
+    synchronized(data) {
+      stored = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
+      if (stored.getGenerationStamp() < b.getGenerationStamp()) {
+        throw new IOException(
+            "stored.getGenerationStamp() < b.getGenerationStamp(), stored="
+            + stored + ", b=" + b);        
+      }
+      isRbw = data.isValidRbw(b);
+      visible = data.getReplicaVisibleLength(b);
+    }
+
+    if (targets.length > 0) {
+      if (isRbw) {
+        //transfer rbw
+        new DataTransfer(targets, b, BlockConstructionStage.TRANSFER_RBW).run();
+      } else {
+        //transfer finalized replica
+        transferBlock(new ExtendedBlock(b.getBlockPoolId(), stored), targets);
+      }
+    }
+    //TODO: should return: visible + storedGS + isRbw
+    return visible;
   }
-  
+
+  /**
+   * Covert an existing temporary replica to a rbw. 
+   * @param temporary specifies id, gs and visible bytes.
+   * @throws IOException
+   */
+  void convertTemporaryToRbw(final ExtendedBlock temporary) throws IOException {
+    data.convertTemporaryToRbw(temporary);
+  }
+
   // Determine a Datanode's streaming address
   public static InetSocketAddress getStreamingAddr(Configuration conf) {
     return NetUtils.createSocketAddr(

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -392,7 +392,8 @@ class DataXceiver extends DataTransferProtocol.Receiver
       // if this write is for a replication request or recovering
       // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if (client.length() == 0 || 
+      if ((client.length() == 0 && stage != BlockConstructionStage.TRANSFER_RBW)
+          ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 

+ 88 - 22
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -129,21 +129,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     private File addBlock(Block b, File src, boolean createOk, 
                           boolean resetIdx) throws IOException {
       if (numBlocks < maxBlocksPerDir) {
-        File dest = new File(dir, b.getBlockName());
-        File metaData = getMetaFile(src, b.getGenerationStamp());
-        File newmeta = getMetaFile(dest, b.getGenerationStamp());
-        if ( ! metaData.renameTo( newmeta ) ||
-            ! src.renameTo( dest ) ) {
-          throw new IOException( "could not move files for " + b +
-                                 " from " + src + " to " + 
-                                 dest.getAbsolutePath() + " or from"
-                                 + metaData + " to " + newmeta);
-        }
-        if (DataNode.LOG.isDebugEnabled()) {
-          DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
-          DataNode.LOG.debug("addBlock: Moved " + src + " to " + dest);
-        }
-
+        final File dest = moveBlockFiles(b, src, dir);
         numBlocks += 1;
         return dest;
       }
@@ -1289,6 +1275,26 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return info.unlinkBlock(numLinks);
   }
 
+  private static File moveBlockFiles(Block b, File srcfile, File destdir
+      ) throws IOException {
+    final File dstfile = new File(destdir, b.getBlockName());
+    final File srcmeta = getMetaFile(srcfile, b.getGenerationStamp());
+    final File dstmeta = getMetaFile(dstfile, b.getGenerationStamp());
+    if (!srcmeta.renameTo(dstmeta)) {
+      throw new IOException("Failed to move meta file for " + b
+          + " from " + srcmeta + " to " + dstmeta);
+    }
+    if (!srcfile.renameTo(dstfile)) {
+      throw new IOException("Failed to move block file for " + b
+          + " from " + srcfile + " to " + dstfile.getAbsolutePath());
+    }
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta);
+      DataNode.LOG.debug("addBlock: Moved " + srcfile + " to " + dstfile);
+    }
+    return dstfile;
+  }
+
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
@@ -1619,6 +1625,56 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return rbw;
   }
   
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
+      final ExtendedBlock b) throws IOException {
+    final long blockId = b.getBlockId();
+    final long expectedGs = b.getGenerationStamp();
+    final long visible = b.getNumBytes();
+    DataNode.LOG.info("Covert the temporary replica " + b
+        + " to RBW, visible length is " + visible);
+
+    // get replica
+    final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
+    if (r == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    // check the replica's state
+    if (r.getState() != ReplicaState.TEMPORARY) {
+      throw new ReplicaNotFoundException(
+          "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+    }
+    // check generation stamp
+    if (r.getGenerationStamp() != expectedGs) {
+      throw new ReplicaNotFoundException(
+          "r.getGenerationStamp() != expectedGs = " + expectedGs + ", r=" + r);
+    }
+    // check length
+    final long numBytes = r.getNumBytes();
+    if (numBytes < visible) {
+      throw new ReplicaNotFoundException(numBytes + " = numBytes < visible = "
+          + visible + ", r=" + r);
+    }
+    // check volume
+    final FSVolume v = r.getVolume();
+    if (v == null) {
+      throw new IOException("r.getVolume() = null, temp="  + r);
+    }
+    
+    // move block files to the rbw directory
+    final File dest = moveBlockFiles(b.getLocalBlock(), r.getBlockFile(),
+        v.getRbwDir(b.getBlockPoolId()));
+    // create RBW
+    final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
+        blockId, numBytes, expectedGs,
+        v, dest.getParentFile(), Thread.currentThread());
+    rbw.setBytesAcked(visible);
+    // overwrite the RBW in the volume map
+    volumeMap.add(b.getBlockPoolId(), rbw);
+    return rbw;
+  }
+
   @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
       throws IOException {
@@ -1815,14 +1871,24 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
    */
   @Override // FSDatasetInterface
   public boolean isValidBlock(ExtendedBlock b) {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+    return isValid(b, ReplicaState.FINALIZED);
+  }
+
+  /**
+   * Check whether the given block is a valid RBW.
+   */
+  @Override // {@link FSDatasetInterface}
+  public boolean isValidRbw(final ExtendedBlock b) {
+    return isValid(b, ReplicaState.RBW);
+  }
+
+  /** Does the block exist and have the given state? */
+  private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
+    final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
         b.getLocalBlock());
-    
-    if (replicaInfo == null || 
-        replicaInfo.getState() != ReplicaState.FINALIZED) {
-      return false;
-    }
-    return replicaInfo.getBlockFile().exists();
+    return replicaInfo != null
+        && replicaInfo.getState() == state
+        && replicaInfo.getBlockFile().exists();
   }
 
   /**

+ 15 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -217,6 +217,14 @@ public interface FSDatasetInterface extends FSDatasetMBean {
       long newGS, long minBytesRcvd, long maxBytesRcvd)
   throws IOException;
 
+  /**
+   * Covert a temporary replica to a RBW.
+   * @param temporary the temporary replica being converted
+   * @return the result RBW
+   */
+  public ReplicaInPipelineInterface convertTemporaryToRbw(
+      ExtendedBlock temporary) throws IOException;
+
   /**
    * Append to a finalized replica and returns the meta info of the replica
    * 
@@ -286,6 +294,13 @@ public interface FSDatasetInterface extends FSDatasetMBean {
    */
   public boolean isValidBlock(ExtendedBlock b);
 
+  /**
+   * Is the block a valid RBW?
+   * @param b
+   * @return - true if the specified block is a valid RBW
+   */
+  public boolean isValidRbw(ExtendedBlock b);
+
   /**
    * Invalidates the specified blocks
    * @param bpid Block pool Id

+ 23 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -427,7 +427,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
 
   @Override // FSDatasetInterface
   public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    if (isBeingWritten(b)) {
+    if (isValidRbw(b)) {
       blockMap.remove(b.getLocalBlock());
     }
   }
@@ -546,7 +546,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
   }
 
   /* check if a block is created but not finalized */
-  private synchronized boolean isBeingWritten(ExtendedBlock b) {
+  @Override
+  public synchronized boolean isValidRbw(ExtendedBlock b) {
     final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
     if (map == null) {
       return false;
@@ -557,7 +558,8 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     }
     return !binfo.isFinalized();  
   }
-  
+
+  @Override
   public String toString() {
     return getStorageInfo();
   }
@@ -642,7 +644,7 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
           throw new ReplicaAlreadyExistsException("Block " + b + 
               " is valid, and cannot be written to.");
       }
-    if (isBeingWritten(b)) {
+    if (isValidRbw(b)) {
         throw new ReplicaAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
@@ -940,4 +942,21 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     blockMap.put(bpid, map);
     storage.addBlockPool(bpid);
   }
+
+  @Override
+  public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
+      throws IOException {
+    final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
+    if (map == null) {
+      throw new IOException("Block pool not found, temporary=" + temporary);
+    }
+    final BInfo r = map.get(temporary.getLocalBlock());
+    if (r == null) {
+      throw new IOException("Block not found, temporary=" + temporary);
+    } else if (r.isFinalized()) {
+      throw new IOException("Replica already finalized, temporary="
+          + temporary + ", r=" + r);
+    }
+    return r;
+  }
 }

+ 139 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java

@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.Collection;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test transferring RBW between datanodes */
+public class TestTransferRbw {
+  private static final Log LOG = LogFactory.getLog(TestTransferRbw.class);
+
+  private static final Random RAN = new Random();
+  private static final short REPLICATION = (short)1;
+
+  private static ReplicaBeingWritten getRbw(final DataNode datanode,
+      String bpid) throws InterruptedException {
+    return (ReplicaBeingWritten)getReplica(datanode, bpid, ReplicaState.RBW);
+  }
+  private static ReplicaInPipeline getReplica(final DataNode datanode,
+      final String bpid, final ReplicaState expectedState) throws InterruptedException {
+    final FSDataset dataset = ((FSDataset)datanode.data);
+    final Collection<ReplicaInfo> replicas = dataset.volumeMap.replicas(bpid);
+    for(int i = 0; i < 5 && replicas.size() == 0; i++) {
+      LOG.info("wait since replicas.size() == 0; i=" + i);
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(1, replicas.size());
+    final ReplicaInfo r = replicas.iterator().next();
+    Assert.assertEquals(expectedState, r.getState());
+    return (ReplicaInPipeline)r;
+  }
+
+  @Test
+  public void testTransferRbw() throws Exception {
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION).build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+      //create a file, write some data and leave it open. 
+      final Path p = new Path("/foo");
+      final int size = (1 << 16) + RAN.nextInt(1 << 16);
+      LOG.info("size = " + size);
+      final FSDataOutputStream out = fs.create(p, REPLICATION);
+      final byte[] bytes = new byte[1024];
+      for(int remaining = size; remaining > 0; ) {
+        RAN.nextBytes(bytes);
+        final int len = bytes.length < remaining? bytes.length: remaining;
+        out.write(bytes, 0, len);
+        out.hflush();
+        remaining -= len;
+      }
+
+      //get the RBW
+      final ReplicaBeingWritten oldrbw;
+      final DataNode newnode;
+      final DatanodeInfo newnodeinfo;
+      final long visible;
+      final String bpid = cluster.getNamesystem().getBlockPoolId();
+      {
+        final DataNode oldnode = cluster.getDataNodes().get(0);
+        oldrbw = getRbw(oldnode, bpid);
+        LOG.info("oldrbw = " + oldrbw);
+        
+        //add a datanode
+        cluster.startDataNodes(conf, 1, true, null, null);
+        newnode = cluster.getDataNodes().get(REPLICATION);
+        
+        {
+          final DatanodeInfo[] datatnodeinfos = cluster.getNameNode(
+              ).getDatanodeReport(DatanodeReportType.LIVE);
+          Assert.assertEquals(2, datatnodeinfos.length);
+          int i = 0;
+          for(DatanodeRegistration dnReg = newnode.getDNRegistrationForBP(bpid);
+              i < datatnodeinfos.length && !datatnodeinfos[i].equals(dnReg); i++);
+          Assert.assertTrue(i < datatnodeinfos.length);
+          newnodeinfo = datatnodeinfos[i];
+        }
+        
+        //transfer RBW
+        visible = oldnode.transferBlockForPipelineRecovery(new ExtendedBlock(
+            bpid, oldrbw), new DatanodeInfo[] { newnodeinfo });
+      }
+
+      //check temporary
+      final ReplicaInPipeline temp = getReplica(newnode, bpid,
+          ReplicaState.TEMPORARY);
+      LOG.info("temp = " + temp);
+      Assert.assertEquals(oldrbw.getBlockId(), temp.getBlockId());
+      Assert.assertEquals(oldrbw.getGenerationStamp(), temp.getGenerationStamp());
+      final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(),
+          visible, oldrbw.getGenerationStamp());
+      //convert temporary to rbw
+      newnode.convertTemporaryToRbw(b);
+      //check new rbw
+      final ReplicaBeingWritten newrbw = getRbw(newnode, bpid);
+      LOG.info("newrbw = " + newrbw);
+      Assert.assertEquals(oldrbw.getBlockId(), newrbw.getBlockId());
+      Assert.assertEquals(oldrbw.getGenerationStamp(), newrbw.getGenerationStamp());
+      Assert.assertEquals(oldrbw.getVisibleLength(), newrbw.getVisibleLength());
+
+      LOG.info("DONE");
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}