Browse Source

HDFS-6898. DN must reserve space for a full block when an RBW block is created. (Contributed by Arpit Agarwal)

arp 10 years ago
parent
commit
d1fa58292e

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -612,6 +612,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6862. Add missing timeout annotations to tests. (Xiaoyu Yao via
     Arpit Agarwal)
 
+    HDFS-6898. DN must reserve space for a full block when an RBW block is
+    created. (Arpit Agarwal)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java

@@ -48,7 +48,7 @@ public class HdfsConstants {
       "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol";
   
   
-  public static final int MIN_BLOCKS_FOR_WRITE = 5;
+  public static final int MIN_BLOCKS_FOR_WRITE = 1;
 
   // Long that indicates "leave current quota unchanged"
   public static final long QUOTA_DONT_SET = Long.MAX_VALUE;

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java

@@ -34,10 +34,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
    * @param genStamp replica generation stamp
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
    */
   public ReplicaBeingWritten(long blockId, long genStamp, 
-        FsVolumeSpi vol, File dir) {
-    super( blockId, genStamp, vol, dir);
+        FsVolumeSpi vol, File dir, long bytesToReserve) {
+    super(blockId, genStamp, vol, dir, bytesToReserve);
   }
   
   /**
@@ -60,10 +62,12 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
    * @param writer a thread that is writing to this replica
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
    */
   public ReplicaBeingWritten(long blockId, long len, long genStamp,
-      FsVolumeSpi vol, File dir, Thread writer ) {
-    super( blockId, len, genStamp, vol, dir, writer);
+      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
+    super(blockId, len, genStamp, vol, dir, writer, bytesToReserve);
   }
 
   /**

+ 29 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -44,6 +44,13 @@ public class ReplicaInPipeline extends ReplicaInfo
   private long bytesOnDisk;
   private byte[] lastChecksum;  
   private Thread writer;
+
+  /**
+   * Bytes reserved for this replica on the containing volume.
+   * Based off difference between the estimated maximum block length and
+   * the bytes already written to this block.
+   */
+  private long bytesReserved;
   
   /**
    * Constructor for a zero length replica
@@ -51,10 +58,12 @@ public class ReplicaInPipeline extends ReplicaInfo
    * @param genStamp replica generation stamp
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
    */
   public ReplicaInPipeline(long blockId, long genStamp, 
-        FsVolumeSpi vol, File dir) {
-    this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
+        FsVolumeSpi vol, File dir, long bytesToReserve) {
+    this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve);
   }
 
   /**
@@ -67,7 +76,7 @@ public class ReplicaInPipeline extends ReplicaInfo
   ReplicaInPipeline(Block block, 
       FsVolumeSpi vol, File dir, Thread writer) {
     this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
-        vol, dir, writer);
+        vol, dir, writer, 0L);
   }
 
   /**
@@ -78,13 +87,16 @@ public class ReplicaInPipeline extends ReplicaInfo
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
    * @param writer a thread that is writing to this replica
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
    */
   ReplicaInPipeline(long blockId, long len, long genStamp,
-      FsVolumeSpi vol, File dir, Thread writer ) {
+      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
     super( blockId, len, genStamp, vol, dir);
     this.bytesAcked = len;
     this.bytesOnDisk = len;
     this.writer = writer;
+    this.bytesReserved = bytesToReserve;
   }
 
   /**
@@ -96,6 +108,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     this.bytesAcked = from.getBytesAcked();
     this.bytesOnDisk = from.getBytesOnDisk();
     this.writer = from.writer;
+    this.bytesReserved = from.bytesReserved;
   }
 
   @Override
@@ -115,13 +128,25 @@ public class ReplicaInPipeline extends ReplicaInfo
   
   @Override // ReplicaInPipelineInterface
   public void setBytesAcked(long bytesAcked) {
+    long newBytesAcked = bytesAcked - this.bytesAcked;
     this.bytesAcked = bytesAcked;
+
+    // Once bytes are ACK'ed we can release equivalent space from the
+    // volume's reservedForRbw count. We could have released it as soon
+    // as the write-to-disk completed but that would be inefficient.
+    getVolume().releaseReservedSpace(newBytesAcked);
+    bytesReserved -= newBytesAcked;
   }
   
   @Override // ReplicaInPipelineInterface
   public long getBytesOnDisk() {
     return bytesOnDisk;
   }
+
+  @Override
+  public long getBytesReserved() {
+    return bytesReserved;
+  }
   
   @Override // ReplicaInPipelineInterface
   public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java

@@ -222,6 +222,13 @@ abstract public class ReplicaInfo extends Block implements Replica {
   public void setUnlinked() {
     // no need to be unlinked
   }
+
+  /**
+   * Number of bytes reserved for this replica on disk.
+   */
+  public long getBytesReserved() {
+    return 0;
+  }
   
    /**
    * Copy specified file into a temporary file. Then rename the

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -45,4 +45,15 @@ public interface FsVolumeSpi {
   public File getFinalizedDir(String bpid) throws IOException;
   
   public StorageType getStorageType();
+
+  /**
+   * Reserve disk space for an RBW block so a writer does not run out of
+   * space before the block is full.
+   */
+  public void reserveSpaceForRbw(long bytesToReserve);
+
+  /**
+   * Release disk space previously reserved for RBW block.
+   */
+  public void releaseReservedSpace(long bytesToRelease);
 }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -240,7 +240,7 @@ class BlockPoolSlice {
     return DatanodeUtil.createTmpFile(b, f);
   }
 
-  File addBlock(Block b, File f) throws IOException {
+  File addFinalizedBlock(Block b, File f) throws IOException {
     File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
     if (!blockDir.exists()) {
       if (!blockDir.mkdirs()) {
@@ -334,9 +334,11 @@ class BlockPoolSlice {
           // The restart meta file exists
           if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
             // It didn't expire. Load the replica as a RBW.
+            // We don't know the expected block length, so just use 0
+            // and don't reserve any more space for writes.
             newReplica = new ReplicaBeingWritten(blockId,
                 validateIntegrityAndSetLength(file, genStamp),
-                genStamp, volume, file.getParentFile(), null);
+                genStamp, volume, file.getParentFile(), null, 0);
             loadRwr = false;
           }
           sc.close();

+ 8 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -593,7 +593,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
+      LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta
           + " and " + srcfile + " to " + dstfile);
     }
     return dstfile;
@@ -712,7 +712,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     File oldmeta = replicaInfo.getMetaFile();
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
         replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
-        v, newBlkFile.getParentFile(), Thread.currentThread());
+        v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen);
     File newmeta = newReplicaInfo.getMetaFile();
 
     // rename meta file to rbw directory
@@ -748,7 +748,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     
     // Replace finalized replica by a RBW replica in replicas map
     volumeMap.add(bpid, newReplicaInfo);
-    
+    v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
     return newReplicaInfo;
   }
 
@@ -876,7 +876,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // create a rbw file to hold block in the designated volume
     File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
-        b.getGenerationStamp(), v, f.getParentFile());
+        b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
     return newReplicaInfo;
   }
@@ -992,7 +992,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // create RBW
     final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
         blockId, numBytes, expectedGs,
-        v, dest.getParentFile(), Thread.currentThread());
+        v, dest.getParentFile(), Thread.currentThread(), 0);
     rbw.setBytesAcked(visible);
     // overwrite the RBW in the volume map
     volumeMap.add(b.getBlockPoolId(), rbw);
@@ -1013,7 +1013,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // create a temporary file to hold block in the designated volume
     File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
-        b.getGenerationStamp(), v, f.getParentFile());
+        b.getGenerationStamp(), v, f.getParentFile(), 0);
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
     
     return newReplicaInfo;
@@ -1079,7 +1079,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             " for block " + replicaInfo);
       }
 
-      File dest = v.addBlock(bpid, replicaInfo, f);
+      File dest = v.addFinalizedBlock(
+          bpid, replicaInfo, f, replicaInfo.getBytesReserved());
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
     }
     volumeMap.add(bpid, newReplicaInfo);

+ 55 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -28,6 +28,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -62,6 +63,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
   private final DF usage;           
   private final long reserved;
 
+  // Disk space reserved for open blocks.
+  private AtomicLong reservedForRbw;
+
   // Capacity configured. This is useful when we want to
   // limit the visible capacity for tests. If negative, then we just
   // query from the filesystem.
@@ -82,6 +86,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.reserved = conf.getLong(
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
+    this.reservedForRbw = new AtomicLong(0L);
     this.currentDir = currentDir; 
     File parent = currentDir.getParentFile();
     this.usage = new DF(parent, conf);
@@ -166,13 +171,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
   @Override
   public long getAvailable() throws IOException {
-    long remaining = getCapacity()-getDfsUsed();
+    long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
     long available = usage.getAvailable();
     if (remaining > available) {
       remaining = available;
     }
     return (remaining > 0) ? remaining : 0;
   }
+
+  @VisibleForTesting
+  public long getReservedForRbw() {
+    return reservedForRbw.get();
+  }
     
   long getReserved(){
     return reserved;
@@ -217,16 +227,58 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return getBlockPoolSlice(bpid).createTmpFile(b);
   }
 
+  @Override
+  public void reserveSpaceForRbw(long bytesToReserve) {
+    if (bytesToReserve != 0) {
+      if (FsDatasetImpl.LOG.isDebugEnabled()) {
+        FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
+      }
+      reservedForRbw.addAndGet(bytesToReserve);
+    }
+  }
+
+  @Override
+  public void releaseReservedSpace(long bytesToRelease) {
+    if (bytesToRelease != 0) {
+      if (FsDatasetImpl.LOG.isDebugEnabled()) {
+        FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
+      }
+
+      long oldReservation, newReservation;
+      do {
+        oldReservation = reservedForRbw.get();
+        newReservation = oldReservation - bytesToRelease;
+        if (newReservation < 0) {
+          // Failsafe, this should never occur in practice, but if it does we don't
+          // want to start advertising more space than we have available.
+          newReservation = 0;
+        }
+      } while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
+    }
+  }
+
   /**
    * RBW files. They get moved to the finalized block directory when
    * the block is finalized.
    */
   File createRbwFile(String bpid, Block b) throws IOException {
+    reserveSpaceForRbw(b.getNumBytes());
     return getBlockPoolSlice(bpid).createRbwFile(b);
   }
 
-  File addBlock(String bpid, Block b, File f) throws IOException {
-    return getBlockPoolSlice(bpid).addBlock(b, f);
+  /**
+   *
+   * @param bytesReservedForRbw Space that was reserved during
+   *     block creation. Now that the block is being finalized we
+   *     can free up this space.
+   * @return
+   * @throws IOException
+   */
+  File addFinalizedBlock(String bpid, Block b,
+                         File f, long bytesReservedForRbw)
+      throws IOException {
+    releaseReservedSpace(bytesReservedForRbw);
+    return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
   }
 
   Executor getCacheExecutor() {

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -424,6 +424,14 @@ public class TestDirectoryScanner {
     public String getStorageID() {
       return "";
     }
+
+    @Override
+    public void reserveSpaceForRbw(long bytesToReserve) {
+    }
+
+    @Override
+    public void releaseReservedSpace(long bytesToRelease) {
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

+ 288 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java

@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.fs.DU;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Ensure that the DN reserves disk space equivalent to a full block for
+ * replica being written (RBW).
+ */
+public class TestRbwSpaceReservation {
+  static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
+
+  private static final short REPL_FACTOR = 1;
+  private static final int DU_REFRESH_INTERVAL_MSEC = 500;
+  private static final int STORAGES_PER_DATANODE = 1;
+  private static final int BLOCK_SIZE = 1024 * 1024;
+  private static final int SMALL_BLOCK_SIZE = 1024;
+
+  protected MiniDFSCluster cluster;
+  private Configuration conf;
+  private DistributedFileSystem fs = null;
+  private DFSClient client = null;
+  FsVolumeImpl singletonVolume = null;
+
+  private static Random rand = new Random();
+
+  private void initConfig(int blockSize) {
+    conf = new HdfsConfiguration();
+
+    // Refresh disk usage information frequently.
+    conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
+
+    // Disable the scanner
+    conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+  }
+
+  static {
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
+    initConfig(blockSize);
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .storagesPerDatanode(STORAGES_PER_DATANODE)
+        .numDataNodes(REPL_FACTOR)
+        .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+    cluster.waitActive();
+
+    if (perVolumeCapacity >= 0) {
+      List<? extends FsVolumeSpi> volumes =
+          cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+      assertThat(volumes.size(), is(1));
+      singletonVolume = ((FsVolumeImpl) volumes.get(0));
+      singletonVolume.setCapacityForTesting(perVolumeCapacity);
+    }
+  }
+
+  @After
+  public void shutdownCluster() throws IOException {
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFileAndTestSpaceReservation(
+      final String fileNamePrefix, final int fileBlockSize)
+      throws IOException, InterruptedException {
+    // Enough for 1 block + meta files + some delta.
+    final long configuredCapacity = fileBlockSize * 2 - 1;
+    startCluster(BLOCK_SIZE, configuredCapacity);
+    FSDataOutputStream out = null;
+    Path path = new Path("/" + fileNamePrefix + ".dat");
+
+    try {
+      out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
+
+      byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
+      out.write(buffer);
+      out.hsync();
+      int bytesWritten = buffer.length;
+
+      // Check that space was reserved for a full block minus the bytesWritten.
+      assertThat(singletonVolume.getReservedForRbw(),
+                 is((long) fileBlockSize - bytesWritten));
+      out.close();
+      out = null;
+
+      // Check that the reserved space has been released since we closed the
+      // file.
+      assertThat(singletonVolume.getReservedForRbw(), is(0L));
+
+      // Reopen the file for appends and write 1 more byte.
+      out = fs.append(path);
+      out.write(buffer);
+      out.hsync();
+      bytesWritten += buffer.length;
+
+      // Check that space was again reserved for a full block minus the
+      // bytesWritten so far.
+      assertThat(singletonVolume.getReservedForRbw(),
+                 is((long) fileBlockSize - bytesWritten));
+
+      // Write once again and again verify the available space. This ensures
+      // that the reserved space is progressively adjusted to account for bytes
+      // written to disk.
+      out.write(buffer);
+      out.hsync();
+      bytesWritten += buffer.length;
+      assertThat(singletonVolume.getReservedForRbw(),
+                 is((long) fileBlockSize - bytesWritten));
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testWithDefaultBlockSize()
+      throws IOException, InterruptedException {
+    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
+  }
+
+  @Test (timeout=300000)
+  public void testWithNonDefaultBlockSize()
+      throws IOException, InterruptedException {
+    // Same test as previous one, but with a non-default block size.
+    createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
+  }
+
+  /**
+   * Stress test to ensure we are not leaking reserved space.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=600000)
+  public void stressTest() throws IOException, InterruptedException {
+    final int numWriters = 5;
+    startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
+    Writer[] writers = new Writer[numWriters];
+
+    // Start a few writers and let them run for a while.
+    for (int i = 0; i < numWriters; ++i) {
+      writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
+      writers[i].start();
+    }
+
+    Thread.sleep(60000);
+
+    // Stop the writers.
+    for (Writer w : writers) {
+      w.stopWriter();
+    }
+    int filesCreated = 0;
+    int numFailures = 0;
+    for (Writer w : writers) {
+      w.join();
+      filesCreated += w.getFilesCreated();
+      numFailures += w.getNumFailures();
+    }
+
+    LOG.info("Stress test created " + filesCreated +
+             " files and hit " + numFailures + " failures");
+
+    // Check no space was leaked.
+    assertThat(singletonVolume.getReservedForRbw(), is(0L));
+  }
+
+  private static class Writer extends Daemon {
+    private volatile boolean keepRunning;
+    private final DFSClient localClient;
+    private int filesCreated = 0;
+    private int numFailures = 0;
+    byte[] data;
+
+    Writer(DFSClient client, int blockSize) throws IOException {
+      localClient = client;
+      keepRunning = true;
+      filesCreated = 0;
+      numFailures = 0;
+
+      // At least some of the files should span a block boundary.
+      data = new byte[blockSize * 2];
+    }
+
+    @Override
+    public void run() {
+      /**
+       * Create a file, write up to 3 blocks of data and close the file.
+       * Do this in a loop until we are told to stop.
+       */
+      while (keepRunning) {
+        OutputStream os = null;
+        try {
+          String filename = "/file-" + rand.nextLong();
+          os = localClient.create(filename, false);
+          os.write(data, 0, rand.nextInt(data.length));
+          IOUtils.closeQuietly(os);
+          os = null;
+          localClient.delete(filename, false);
+          Thread.sleep(50);     // Sleep for a bit to avoid killing the system.
+          ++filesCreated;
+        } catch (IOException ioe) {
+          // Just ignore the exception and keep going.
+          ++numFailures;
+        } catch (InterruptedException ie) {
+          return;
+        } finally {
+          if (os != null) {
+            IOUtils.closeQuietly(os);
+          }
+        }
+      }
+    }
+
+    public void stopWriter() {
+      keepRunning = false;
+    }
+
+    public int getFilesCreated() {
+      return filesCreated;
+    }
+
+    public int getNumFailures() {
+      return numFailures;
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -158,7 +158,7 @@ public class TestWriteToReplica {
     replicasMap.add(bpid, new ReplicaInPipeline(
         blocks[TEMPORARY].getBlockId(),
         blocks[TEMPORARY].getGenerationStamp(), vol, 
-        vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile()));
+        vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0));
     
     replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, 
         vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);