Explorar o código

HDFS-543. Break FSDatasetInterface#writeToBlock() into writeToTemporary, writeToRBW, and append. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@808512 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang %!s(int64=15) %!d(string=hai) anos
pai
achega
2626fd61e8

+ 3 - 0
CHANGES.txt

@@ -113,6 +113,9 @@ Trunk (unreleased changes)
     HDFS-562. Add a test for NameNode.getBlockLocations(..) to check read from
     un-closed file.  (szetszwo)
 
+    HDFS-543. Break FSDatasetInterface#writToBlock() into writeToRemporary,
+    writeToRBW, ad append. (hairong)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 

+ 43 - 0
src/java/org/apache/hadoop/hdfs/server/datanode/BlockNotFoundException.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that DataNode does not have a replica
+ * that matches the target block.  
+ */
+class BlockNotFoundException extends IOException {
+  private static final long serialVersionUID = 1L;
+  final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica";
+  final static String UNFINALIZED_REPLICA = 
+    "Cannot append to an unfinalized replica ";
+  final static String NON_EXISTENT_REPLICA =
+    "Cannot append to a non-existent replica ";
+
+  public BlockNotFoundException() {
+    super();
+  }
+
+  public BlockNotFoundException(String msg) {
+    super(msg);
+  }
+}

+ 11 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -95,11 +95,18 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       this.checksum = DataChecksum.newDataChecksum(in);
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
+      this.finalized = datanode.data.isValidBlock(block);
       //
       // Open local disk out
       //
-      streams = datanode.data.writeToBlock(block, isRecovery);
-      this.finalized = datanode.data.isValidBlock(block);
+      if (clientName.length() == 0) { //replication or move
+        streams = datanode.data.writeToTemporary(block);
+      } else if (finalized && isRecovery) { // client append
+        streams = datanode.data.append(block);
+        this.finalized = false;
+      } else { // client write
+        streams = datanode.data.writeToRbw(block, isRecovery);
+      }
       if (streams != null) {
         this.out = streams.dataOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
@@ -113,6 +120,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
       }
     } catch (BlockAlreadyExistsException bae) {
       throw bae;
+    } catch (BlockNotFoundException bne) {
+      throw bne;
     } catch(IOException ioe) {
       IOUtils.closeStream(this);
       cleanupBlock();

+ 165 - 166
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -28,10 +28,8 @@ import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
@@ -39,7 +37,6 @@ import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
@@ -55,7 +52,6 @@ import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.io.IOUtils;
 
@@ -292,7 +288,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     
     FSVolume(File currentDir, Configuration conf) throws IOException {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
-      boolean supportAppends = conf.getBoolean("dfs.support.append", false);
       File parent = currentDir.getParentFile();
       final File finalizedDir = new File(
           currentDir, DataStorage.STORAGE_DIR_FINALIZED);
@@ -309,11 +304,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       //
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
-        if (supportAppends) {
-          recoverDetachedBlocks(finalizedDir, tmpDir);
-        } else {
-          FileUtil.fullyDelete(tmpDir);
-        }
+        FileUtil.fullyDelete(tmpDir);
       }
       this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
       if (rbwDir.exists() && !supportAppends) {
@@ -374,14 +365,23 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
     
     /**
-     * Temporary files. They get moved to the real block directory either when
-     * the block is finalized or the datanode restarts.
+     * Temporary files. They get moved to the finalized block directory when
+     * the block is finalized.
      */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
       return createTmpFile(b, f);
     }
 
+    /**
+     * RBW files. They get moved to the finalized block directory when
+     * the block is finalized.
+     */
+    File createRbwFile(Block b) throws IOException {
+      File f = new File(rbwDir, b.getBlockName());
+      return createTmpFile(b, f);
+    }
+
     /**
      * Returns the name of the temporary file for this block.
      */
@@ -832,11 +832,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   // Used for synchronizing access to usage stats
   private Object statsLock = new Object();
 
+  boolean supportAppends = false;
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
   public FSDataset(DataStorage storage, Configuration conf) throws IOException {
     this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+    this.supportAppends = conf.getBoolean("dfs.support.append", false);
     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
@@ -937,13 +940,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
                           long blkOffset, long ckoff) throws IOException {
 
     ReplicaInfo info = getReplicaInfo(b);
-    FSVolume v = info.getVolume();
-    File blockFile = v.getTmpFile(b);
+    File blockFile = info.getBlockFile();
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (blkOffset > 0) {
       blockInFile.seek(blkOffset);
     }
-    File metaFile = getMetaFile(blockFile, b);
+    File metaFile = info.getMetaFile();
     RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
     if (ckoff > 0) {
       metaInFile.seek(ckoff);
@@ -984,56 +986,15 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
           + ") to newblock (=" + newblock + ").");
     }
     
-    for(;;) {
-      final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
-      if (threads == null) {
-        return;
-      }
-
-      // interrupt and wait for all ongoing create threads
-      for(Thread t : threads) {
-        t.interrupt();
-      }
-      for(Thread t : threads) {
-        try {
-          t.join();
-        } catch (InterruptedException e) {
-          DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Try to update an old block to a new block.
-   * If there are write threads running for the old block,
-   * the threads will be returned without updating the block. 
-   * 
-   * @return write threads if there is any. Otherwise, return null.
-   */
-  private synchronized List<Thread> tryUpdateBlock(
-      Block oldblock, Block newblock) throws IOException {
-    //check write threads
     final ReplicaInfo replicaInfo = volumeMap.get(oldblock.getBlockId());
     File blockFile = replicaInfo==null?null:replicaInfo.getBlockFile();
     if (blockFile == null) {
       throw new IOException("Block " + oldblock + " does not exist.");
     }
 
+    //check write threads
     if (replicaInfo instanceof ReplicaInPipeline) {
-      List<Thread> threads = ((ReplicaInPipeline)replicaInfo).getThreads();
-      //remove dead threads
-      for(Iterator<Thread> i = threads.iterator(); i.hasNext(); ) {
-        final Thread t = i.next();
-        if (!t.isAlive()) {
-          i.remove();
-        }
-      }
-
-      //return living threads
-      if (!threads.isEmpty()) {
-        return new ArrayList<Thread>(threads);
-      }
+      ((ReplicaInPipeline)replicaInfo).stopWriter();
     }
 
     //No ongoing create threads is alive.  Update block.
@@ -1076,7 +1037,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     // paranoia! verify that the contents of the stored block 
     // matches the block file on disk.
     validateBlockMetadata(newblock);
-    return null;
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
@@ -1139,115 +1099,109 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
   }
 
-  /**
-   * Start writing to a block file
-   * If isRecovery is true and the block pre-exists, then we kill all
-      volumeMap.put(b, v);
-      volumeMap.put(b, v);
-   * other threads that might be writing to this block, and then reopen the file.
-   */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
-    //
-    // Make sure the block isn't a valid one - we're still creating it!
-    //
+  @Override  // FSDatasetInterface
+  public BlockWriteStreams append(Block b)
+      throws IOException {
+    // If the block was successfully finalized because all packets
+    // were successfully processed at the Datanode but the ack for
+    // some of the packets were not received by the client. The client 
+    // re-opens the connection and retries sending those packets.
+    // The other reason is that an "append" is occurring to this block.
+    
     ReplicaInfo replicaInfo = volumeMap.get(b);
-    if (isValidBlock(b)) {
-      if (!isRecovery) {
-        throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
-      }
-      // If the block was successfully finalized because all packets
-      // were successfully processed at the Datanode but the ack for
-      // some of the packets were not received by the client. The client 
-      // re-opens the connection and retries sending those packets.
-      // The other reason is that an "append" is occurring to this block.
-      if (replicaInfo != null) {
-        replicaInfo.detachBlock(1);
-      }
+    // check the validity of the parameter
+    if (replicaInfo == null) {
+      throw new BlockNotFoundException(
+          BlockNotFoundException.NON_EXISTENT_REPLICA + b);
+    }  
+    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+      throw new BlockNotFoundException(
+          BlockNotFoundException.UNFINALIZED_REPLICA + b);
     }
-    long blockSize = b.getNumBytes();
+    
+    DataNode.LOG.info("Reopen Block for append " + b);
 
-    //
-    // Serialize access to /tmp, and check if file already there.
-    //
-    File f = null;
-    List<Thread> threads = null;
-    synchronized (this) {
-      //
-      // Is it already in the create process?
-      //
-      if (replicaInfo != null && replicaInfo instanceof ReplicaInPipeline) {
-        f = replicaInfo.getBlockFile();
-        threads = ((ReplicaInPipeline)replicaInfo).getThreads();
-        
-        if (!isRecovery) {
-          throw new BlockAlreadyExistsException("Block " + b +
-                                  " has already been started (though not completed), and thus cannot be created.");
-        } else {
-          for (Thread thread:threads) {
-            thread.interrupt();
-          }
-        }
-      }
-      FSVolume v = null;
-      if (!isRecovery) { // create a new block
-        v = volumes.getNextVolume(blockSize);
-        // create temporary file to hold block in the designated volume
-        f = createTmpFile(v, b);
-        replicaInfo = new ReplicaInPipeline(b.getBlockId(), 
-            b.getGenerationStamp(), v, f.getParentFile());
-        volumeMap.add(replicaInfo);
-      } else if (f != null) {
-        DataNode.LOG.info("Reopen already-open Block for append " + b);
-      } else {
-        // reopening block for appending to it.
-        DataNode.LOG.info("Reopen Block for append " + b);
-        v = replicaInfo.getVolume();
-        f = createTmpFile(v, b);
-        File blkfile = replicaInfo.getBlockFile();
-        File oldmeta = replicaInfo.getMetaFile();
-        replicaInfo = new ReplicaInPipeline(replicaInfo,
-            v, f.getParentFile(), threads);
-        File newmeta = replicaInfo.getMetaFile();
-
-        // rename meta file to tmp directory
-        DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-        if (!oldmeta.renameTo(newmeta)) {
-          throw new IOException("Block " + b + " reopen failed. " +
-                                " Unable to move meta file  " + oldmeta +
-                                " to tmp dir " + newmeta);
-        }
+    // unlink the finalized replica
+    replicaInfo.detachBlock(1);
+    
+    // construct a RBW replica
+    File blkfile = replicaInfo.getBlockFile();
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    File newBlkFile = v.createRbwFile(b);
+    File oldmeta = replicaInfo.getMetaFile();
+    replicaInfo = new ReplicaBeingWritten(replicaInfo,
+        v, newBlkFile.getParentFile(), Thread.currentThread());
+    File newmeta = replicaInfo.getMetaFile();
+
+    // rename meta file to rbw directory
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    if (!oldmeta.renameTo(newmeta)) {
+      throw new IOException("Block " + b + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to rbw dir " + newmeta);
+    }
+
+    // rename block file to rbw directory
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+    }
+    if (!blkfile.renameTo(newBlkFile)) {
+      if (!newmeta.renameTo(oldmeta)) {  // restore the meta file
+        DataNode.LOG.warn("Cannot move meta file " + newmeta + 
+            "back to the finalized directory " + oldmeta);
+      }
+      throw new IOException("Block " + b + " reopen failed. " +
+                              " Unable to move block file " + blkfile +
+                              " to rbw dir " + newBlkFile);
+    }
+    
+    // Replace finalized replica by a RBW replica in replicas map
+    volumeMap.add(replicaInfo);
+    
+    File metafile = getMetaFile(newBlkFile, b);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("append blockfile is " + newBlkFile 
+                       + " of size " + newBlkFile.length());
+      DataNode.LOG.debug("append metafile is " + metafile 
+                       + " of size " + metafile.length());
+    }    
+    // return the write stream
+    return createBlockWriteStreams(newBlkFile , metafile);
+  }
 
-        // rename block file to tmp directory
-        DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
-        if (!blkfile.renameTo(f)) {
-          if (!f.delete()) {
-            throw new IOException("Block " + b + " reopen failed. " +
-                                  " Unable to remove file " + f);
-          }
-          if (!blkfile.renameTo(f)) {
-            throw new IOException("Block " + b + " reopen failed. " +
-                                  " Unable to move block file " + blkfile +
-                                  " to tmp dir " + f);
-          }
-        }
-        volumeMap.add(replicaInfo);
+  @Override
+  public BlockWriteStreams writeToRbw(Block b, boolean isRecovery)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    File f = null;
+    if (replicaInfo == null) { // create a new block
+      FSVolume v = volumes.getNextVolume(b.getNumBytes());
+      // create a rbw file to hold block in the designated volume
+      f = v.createRbwFile(b);
+      replicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
+          b.getGenerationStamp(), v, f.getParentFile());
+      volumeMap.add(replicaInfo);
+    } else {
+      if (!isRecovery) {
+        throw new BlockAlreadyExistsException("Block " + b +
+        " already exists in state " + replicaInfo.getState() +
+        " and thus cannot be created.");
       }
-      if (f == null) {
-        DataNode.LOG.warn("Block " + b + " reopen failed " +
-                          " Unable to locate tmp file.");
-        throw new IOException("Block " + b + " reopen failed " +
-                              " Unable to locate tmp file.");
+      if (replicaInfo.getState() != ReplicaState.RBW) {
+        throw new BlockNotFoundException(
+            BlockNotFoundException.NON_RBW_REPLICA + b);
       }
-    }
-
-    try {
-      if (threads != null) {
-        for (Thread thread:threads) {
-          thread.join();
-        }
+      ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline)replicaInfo;
+      synchronized (this) {
+        //
+        // Is it already in the write process?
+        //
+        replicaInPipeline.stopWriter();
+        replicaInPipeline.setWriter(Thread.currentThread());
       }
-    } catch (InterruptedException e) {
-      throw new IOException("Recovery waiting for thread interrupted.");
+      f = replicaInfo.getBlockFile();
     }
 
     //
@@ -1256,8 +1210,42 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     // block size, so clients can't go crazy
     //
     File metafile = getMetaFile(f, b);
-    DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
-    DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("writeToRbw blockfile is " + f +
+                         " of size " + f.length());
+      DataNode.LOG.debug("writeToRbw metafile is " + metafile +
+                         " of size " + metafile.length());
+    }
+    return createBlockWriteStreams( f , metafile);
+
+  }
+  
+  @Override
+  public BlockWriteStreams writeToTemporary(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo != null) {
+      throw new BlockAlreadyExistsException("Block " + b +
+          " already exists in state " + replicaInfo.getState() +
+          " and thus cannot be created.");
+    }
+    
+    File f = null;
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    // create a temporary file to hold block in the designated volume
+    f = v.createTmpFile(b);
+    replicaInfo = new ReplicaInPipeline(b.getBlockId(), 
+        b.getGenerationStamp(), v, f.getParentFile());
+    volumeMap.add(replicaInfo);
+    
+    // return the output streams
+    File metafile = getMetaFile(f, b);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("writeToTemp blockfile is " + f + 
+          " of size " + f.length());
+      DataNode.LOG.debug("writeToTemp metafile is " + metafile + 
+          " of size " + metafile.length());
+    }
     return createBlockWriteStreams( f , metafile);
   }
 
@@ -1280,8 +1268,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
                                  throws IOException {
     long size = 0;
     synchronized (this) {
-      FSVolume vol = getReplicaInfo(b).getVolume();
-      size = vol.getTmpFile(b).length();
+      size = getReplicaInfo(b).getBlockFile().length();
     }
     if (size < dataOffset) {
       String msg = "Trying to change block file offset of block " + b +
@@ -1306,6 +1293,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return vol.createTmpFile(blk);
   }
 
+  synchronized File createRbwFile( FSVolume vol, Block blk ) throws IOException {
+    if ( vol == null ) {
+      vol = getReplicaInfo( blk ).getVolume();
+      if ( vol == null ) {
+        throw new IOException("Could not find volume for block " + blk);
+      }
+    }
+    return vol.createTmpFile(blk);
+  }
+
   //
   // REMIND - mjc - eventually we should have a timeout system
   // in place to clean up block files left by abandoned clients.
@@ -1391,7 +1388,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     ArrayList<Block> list =  new ArrayList<Block>(volumeMap.size());
     synchronized(this) {
       for (ReplicaInfo b : volumeMap.replicas()) {
-        if (b.getState() == ReplicaState.FINALIZED) {
+        if (b.getState() == ReplicaState.FINALIZED ) {
+          list.add(new Block(b));
+        } else if (supportAppends && b.getState() == ReplicaState.RWR) {
           list.add(new Block(b));
         }
       }

+ 29 - 4
src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -144,6 +144,10 @@ public interface FSDatasetInterface extends FSDatasetMBean {
         checksumOut = cOut;
       }
       
+      void close() throws IOException {
+        IOUtils.closeStream(dataOut);
+        IOUtils.closeStream(checksumOut);
+      }
     }
 
   /**
@@ -167,14 +171,35 @@ public interface FSDatasetInterface extends FSDatasetMBean {
   }
     
   /**
-   * Creates the block and returns output streams to write data and CRC
-   * @param b
-   * @param isRecovery True if this is part of erro recovery, otherwise false
+   * Creates a temporary replica and returns output streams to write data and CRC
+   * 
+   * @param b block
+   * @return a BlockWriteStreams object to allow writing the block data
+   *  and CRC
+   * @throws IOException if an error occurs
+   */
+  public BlockWriteStreams writeToTemporary(Block b) throws IOException;
+
+  /**
+   * Creates/recovers a RBW replica and returns output streams to 
+   * write data and CRC
+   * 
+   * @param b block
+   * @param isRecovery True if this is part of error recovery, otherwise false
+   * @return a BlockWriteStreams object to allow writing the block data
+   *  and CRC
+   * @throws IOException if an error occurs
+   */
+  public BlockWriteStreams writeToRbw(Block b, boolean isRecovery) throws IOException;
+
+  /**
+   * Append to a finalized replica and returns output streams to write data and CRC
+   * @param b block
    * @return a BlockWriteStreams object to allow writing the block data
    *  and CRC
    * @throws IOException
    */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+  public BlockWriteStreams append(Block b) throws IOException;
 
   /**
    * Update the block to the new generation stamp and length.  

+ 6 - 8
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java

@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
-
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
@@ -47,11 +45,11 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
    * @param block a block
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
-   * @param threads a list of threads that are writing to this replica
+   * @param writer a thread that is writing to this replica
    */
   ReplicaBeingWritten(Block block, 
-      FSVolume vol, File dir, List<Thread> threads) {
-    super( block, vol, dir, threads);
+      FSVolume vol, File dir, Thread writer) {
+    super( block, vol, dir, writer);
   }
 
   /**
@@ -61,11 +59,11 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
    * @param genStamp replica generation stamp
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
-   * @param threads a list of threads that are writing to this replica
+   * @param writer a thread that is writing to this replica
    */
   ReplicaBeingWritten(long blockId, long len, long genStamp,
-      FSVolume vol, File dir, List<Thread> threads ) {
-    super( blockId, len, genStamp, vol, dir, threads);
+      FSVolume vol, File dir, Thread writer ) {
+    super( blockId, len, genStamp, vol, dir, writer);
   }
   
   @Override   //ReplicaInfo

+ 27 - 25
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@@ -37,7 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 class ReplicaInPipeline extends ReplicaInfo {
   private long bytesAcked;
   private long bytesOnDisk;
-  private List<Thread> threads = new ArrayList<Thread>();
+  private Thread writer;
   
   /**
    * Constructor for a zero length replica
@@ -49,7 +47,7 @@ class ReplicaInPipeline extends ReplicaInfo {
    */
     ReplicaInPipeline(long blockId, long genStamp, 
         FSVolume vol, File dir) {
-    this( blockId, 0L, genStamp, vol, dir, null);
+    this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
   }
 
   /**
@@ -57,12 +55,12 @@ class ReplicaInPipeline extends ReplicaInfo {
    * @param block a block
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
-   * @param threads a list of threads that are writing to this replica
+   * @param writer a thread that is writing to this replica
    */
   ReplicaInPipeline(Block block, 
-      FSVolume vol, File dir, List<Thread> threads) {
+      FSVolume vol, File dir, Thread writer) {
     this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
-        vol, dir, threads);
+        vol, dir, writer);
   }
 
   /**
@@ -72,15 +70,14 @@ class ReplicaInPipeline extends ReplicaInfo {
    * @param genStamp replica generation stamp
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
-   * @param threads a list of threads that are writing to this replica
+   * @param writer a thread that is writing to this replica
    */
   ReplicaInPipeline(long blockId, long len, long genStamp,
-      FSVolume vol, File dir, List<Thread> threads ) {
+      FSVolume vol, File dir, Thread writer ) {
     super( blockId, len, genStamp, vol, dir);
     this.bytesAcked = len;
     this.bytesOnDisk = len;
-    setThreads(threads);
-    this.threads.add(Thread.currentThread());
+    this.writer = writer;
   }
 
   @Override  //ReplicaInfo
@@ -127,21 +124,11 @@ class ReplicaInPipeline extends ReplicaInfo {
   }
   
   /**
-   * Set the threads that are writing to this replica
-   * @param threads a list of threads writing to this replica
+   * Set the thread that is writing to this replica
+   * @param writer a thread writing to this replica
    */
-  public void setThreads(List<Thread> threads) {
-    if (threads != null) {
-      threads.addAll(threads);
-    }
-  }
-  
-  /**
-   * Get a list of threads writing to this replica 
-   * @return a list of threads writing to this replica
-   */
-  public List<Thread> getThreads() {
-    return threads;
+  public void setWriter(Thread writer) {
+    this.writer = writer;
   }
   
   @Override  // Object
@@ -149,6 +136,21 @@ class ReplicaInPipeline extends ReplicaInfo {
     return super.equals(o);
   }
   
+  /**
+   * Interrupt the writing thread and wait until it dies
+   * @throws IOException the waiting is interrupted
+   */
+  void stopWriter() throws IOException {
+    if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
+      writer.interrupt();
+      try {
+        writer.join();
+      } catch (InterruptedException e) {
+        throw new IOException("Waiting for writer thread is interrupted.");
+      }
+    }
+  }
+  
   @Override  // Object
   public int hashCode() {
     return super.hashCode();

+ 2 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java

@@ -39,8 +39,8 @@ class ReplicaUnderRecovery extends ReplicaInfo {
   ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
     super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
         replica.getVolume(), replica.getDir());
-    if ( replica.getState() != ReplicaState.FINALIZED ||
-         replica.getState() != ReplicaState.RBW ||
+    if ( replica.getState() != ReplicaState.FINALIZED &&
+         replica.getState() != ReplicaState.RBW &&
          replica.getState() != ReplicaState.RWR ) {
       throw new IllegalArgumentException("Cannot recover replica: " + replica);
     }

+ 18 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -380,7 +380,24 @@ public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Con
     return getStorageInfo();
   }
 
-  public synchronized BlockWriteStreams writeToBlock(Block b, 
+  @Override
+  public BlockWriteStreams append(Block b) throws IOException {
+    return writeToBlock(b, true);
+  }
+
+  @Override
+  public synchronized BlockWriteStreams writeToRbw(Block b, boolean isRecovery)
+      throws IOException {
+    return writeToBlock(b, isRecovery);
+  }
+
+  @Override
+  public synchronized BlockWriteStreams writeToTemporary(Block b)
+      throws IOException {
+    return writeToBlock(b, false);
+  }
+
+  private synchronized BlockWriteStreams writeToBlock(Block b, 
                                             boolean isRecovery)
                                             throws IOException {
     if (isValidBlock(b)) {

+ 10 - 12
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java

@@ -33,12 +33,13 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 import org.apache.hadoop.io.IOUtils;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import org.junit.Assert;
 
 /** Test if a datanode can correctly upgrade itself */
-public class TestDatanodeRestart extends TestCase {
+public class TestDatanodeRestart {
   // test finalized replicas persist across DataNode restarts
-  public void testFinalizedReplicas() throws Exception {
+  @Test public void testFinalizedReplicas() throws Exception {
     // bring up a cluster of 3
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", 1024L);
@@ -62,7 +63,7 @@ public class TestDatanodeRestart extends TestCase {
   }
   
   // test rbw replicas persist across DataNode restarts
-  public void testRbwReplicas() throws IOException {
+  @Test public void testRbwReplicas() throws IOException {
     Configuration conf = new Configuration();
     conf.setLong("dfs.block.size", 1024L);
     conf.setInt("dfs.write.packet.size", 512);
@@ -91,16 +92,13 @@ public class TestDatanodeRestart extends TestCase {
       out.write(writeBuf);
       out.sync();
       DataNode dn = cluster.getDataNodes().get(0);
-      // move tmp replicas to be rbw replicas: this is a temporary trick
       for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
         File currentDir = volume.getDir().getParentFile();
-        File tmpDir = new File(currentDir.getParentFile(), "tmp");
         File rbwDir = new File(currentDir, "rbw");
-        for (File file : tmpDir.listFiles()) {
+        for (File file : rbwDir.listFiles()) {
           if (isCorrupt && Block.isBlockFilename(file)) {
             new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt
           }
-          file.renameTo(new File(rbwDir, file.getName()));
         }
       }
       cluster.restartDataNodes();
@@ -109,13 +107,13 @@ public class TestDatanodeRestart extends TestCase {
 
       // check volumeMap: one rwr replica
       ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
-      assertEquals(1, replicas.size());
+      Assert.assertEquals(1, replicas.size());
       ReplicaInfo replica = replicas.replicas().iterator().next();
-      assertEquals(ReplicaState.RWR, replica.getState());
+      Assert.assertEquals(ReplicaState.RWR, replica.getState());
       if (isCorrupt) {
-        assertEquals((fileLen-1)/512*512, replica.getNumBytes());
+        Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
       } else {
-        assertEquals(fileLen, replica.getNumBytes());
+        Assert.assertEquals(fileLen, replica.getNumBytes());
       }
       dn.data.invalidate(new Block[]{replica});
       fs.delete(src, false);

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -57,8 +57,8 @@ public class TestDiskError extends TestCase {
     FileSystem fs = cluster.getFileSystem();
     final int dnIndex = 0;
     String dataDir = cluster.getDataDirectory();
-    File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "tmp");
-    File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "tmp");
+    File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "current/rbw");
+    File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "current/rbw");
     try {
       // make the data directory of the first datanode to be readonly
       assertTrue(dir1.setReadOnly());
@@ -135,8 +135,8 @@ public class TestDiskError extends TestCase {
       
       // the temporary block & meta files should be deleted
       String dataDir = cluster.getDataDirectory();
-      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "tmp");
-      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "tmp");
+      File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "current/rbw");
+      File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "current/rbw");
       while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
         Thread.sleep(100);
       }

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -62,7 +62,7 @@ public class TestSimulatedFSDataset extends TestCase {
     int bytesAdded = 0;
     for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
       Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
-      OutputStream dataOut  = fsdataset.writeToBlock(b, false).dataOut;
+      OutputStream dataOut  = fsdataset.writeToRbw(b, false).dataOut;
       assertEquals(0, fsdataset.getLength(b));
       for (int j=1; j <= blockIdToLen(i); ++j) {
         dataOut.write(j);

+ 288 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test if FSDataset#append, writeToRbw, and writeToTmp */
+public class TestWriteToReplica {
+  final private static Block[] blocks = new Block[] {
+    new Block(1, 1, 2001), new Block(2, 1, 2002), 
+    new Block(3, 1, 2003), new Block(4, 1, 2004),
+    new Block(5, 1, 2005), new Block(6, 1, 2006)
+  };
+  final private static int FINALIZED = 0;
+  final private static int TEMPORARY = 1;
+  final private static int RBW = 2;
+  final private static int RWR = 3;
+  final private static int RUR = 4;
+  final private static int NON_EXISTENT = 5;
+  
+  // test append
+  @Test
+  public void testAppend() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test append
+      testAppend(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  // test writeToRbw
+  @Test
+  public void testWriteToRbw() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test writeToRbw
+      testWriteToRbw(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  // test writeToRbw
+  @Test
+  public void testWriteToTempoary() throws Exception {
+    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, true, null);
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FSDataset dataSet = (FSDataset)dn.data;
+
+      // set up replicasMap
+      setup(dataSet);
+
+      // test writeToTemporary
+      testWriteToTemporary(dataSet);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  private void setup(FSDataset dataSet) throws IOException {
+    // setup replicas map
+    ReplicasMap replicasMap = dataSet.volumeMap;
+    FSVolume vol = dataSet.volumes.getNextVolume(0);
+    ReplicaInfo replicaInfo = new FinalizedReplica(
+        blocks[FINALIZED], vol, vol.getDir());
+    replicasMap.add(replicaInfo);
+    replicaInfo.getBlockFile().createNewFile();
+    replicaInfo.getMetaFile().createNewFile();
+    replicasMap.add(new ReplicaInPipeline(blocks[TEMPORARY].getBlockId(),
+        blocks[TEMPORARY].getGenerationStamp(), vol, 
+        vol.createTmpFile(blocks[TEMPORARY]).getParentFile()));
+    replicasMap.add(new ReplicaBeingWritten(blocks[RBW].getBlockId(),
+        blocks[RBW].getGenerationStamp(), vol, 
+        vol.createRbwFile(blocks[RBW]).getParentFile()));
+    replicasMap.add(new ReplicaWaitingToBeRecovered(blocks[RWR], vol, 
+        vol.createRbwFile(blocks[RWR]).getParentFile()));
+    replicasMap.add(new ReplicaUnderRecovery(
+        new FinalizedReplica(blocks[RUR], vol, vol.getDir()), 2007));    
+  }
+  
+  private void testAppend(FSDataset dataSet) throws IOException {
+    dataSet.append(blocks[FINALIZED]).close();  // successful
+    
+    try {
+      dataSet.append(blocks[TEMPORARY]).close();
+      Assert.fail("Should not have appended to a temporary replica " 
+          + blocks[TEMPORARY]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
+          blocks[TEMPORARY], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[RBW]).close();
+      Assert.fail("Should not have appended to an RBW replica" + blocks[RBW]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
+          blocks[RBW], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[RWR]).close();
+      Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
+          blocks[RWR], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[RUR]).close();
+      Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
+          blocks[RUR], e.getMessage());
+    }
+
+    try {
+      dataSet.append(blocks[NON_EXISTENT]).close();
+      Assert.fail("Should not have appended to a non-existent replica " + 
+          blocks[NON_EXISTENT]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.NON_EXISTENT_REPLICA + 
+          blocks[NON_EXISTENT], e.getMessage());
+    }
+  }
+
+  private void testWriteToRbw(FSDataset dataSet) throws IOException {
+    try {
+      dataSet.writeToRbw(blocks[FINALIZED], true).close();
+      Assert.fail("Should not have recovered a finalized replica " +
+          blocks[FINALIZED]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA + 
+          blocks[FINALIZED], e.getMessage());
+    }
+ 
+    try {
+      dataSet.writeToRbw(blocks[FINALIZED], false).close();
+      Assert.fail("Should not have created a replica that's already " +
+      		"finalized " + blocks[FINALIZED]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+ 
+    try {
+      dataSet.writeToRbw(blocks[TEMPORARY], true).close();
+      Assert.fail("Should not have recovered a temporary replica " +
+          blocks[TEMPORARY]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA + 
+          blocks[TEMPORARY], e.getMessage());
+    }
+
+    try {
+      dataSet.writeToRbw(blocks[TEMPORARY], false).close();
+      Assert.fail("Should not have created a replica that had created as " +
+      		"temporary " + blocks[TEMPORARY]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+        
+    dataSet.writeToRbw(blocks[RBW], true).close();  // expect to be successful
+    
+    try {
+      dataSet.writeToRbw(blocks[RBW], false).close();
+      Assert.fail("Should not have created a replica that had created as RBW " +
+          blocks[RBW]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.writeToRbw(blocks[RWR], true).close();
+      Assert.fail("Should not have recovered a RWR replica " + blocks[RWR]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA + 
+          blocks[RWR], e.getMessage());
+    }
+
+    try {
+      dataSet.writeToRbw(blocks[RWR], false).close();
+      Assert.fail("Should not have created a replica that was waiting to be " +
+      		"recovered " + blocks[RWR]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.writeToRbw(blocks[RUR], true).close();
+      Assert.fail("Should not have recovered a RUR replica " + blocks[RUR]);
+    } catch (BlockNotFoundException e) {
+      Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA + 
+          blocks[RUR], e.getMessage());
+    }
+
+    try {
+      dataSet.writeToRbw(blocks[RUR], false).close();
+      Assert.fail("Should not have created a replica that was under recovery " +
+          blocks[RUR]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    dataSet.writeToRbw(blocks[NON_EXISTENT], true).close();
+    
+    // remove this replica
+    ReplicaInfo removedReplica = dataSet.volumeMap.remove(blocks[NON_EXISTENT]);
+    removedReplica.getBlockFile().delete();
+    removedReplica.getMetaFile().delete();
+    
+    dataSet.writeToRbw(blocks[NON_EXISTENT], false).close();
+  }
+  
+  private void testWriteToTemporary(FSDataset dataSet) throws IOException {
+    try {
+      dataSet.writeToTemporary(blocks[FINALIZED]).close();
+      Assert.fail("Should not have created a temporary replica that was " +
+      		"finalized " + blocks[FINALIZED]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+ 
+    try {
+      dataSet.writeToTemporary(blocks[TEMPORARY]).close();
+      Assert.fail("Should not have created a replica that had created as" +
+      		"temporary " + blocks[TEMPORARY]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.writeToTemporary(blocks[RBW]).close();
+      Assert.fail("Should not have created a replica that had created as RBW " +
+          blocks[RBW]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.writeToTemporary(blocks[RWR]).close();
+      Assert.fail("Should not have created a replica that was waiting to be " +
+      		"recovered " + blocks[RWR]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    try {
+      dataSet.writeToTemporary(blocks[RUR]).close();
+      Assert.fail("Should not have created a replica that was under recovery " +
+          blocks[RUR]);
+    } catch (BlockAlreadyExistsException e) {
+    }
+    
+    dataSet.writeToTemporary(blocks[NON_EXISTENT]).close();
+  }
+}