Selaa lähdekoodia

HDFS-8072. Reserved RBW space is not released if client terminates while writing block. (Arpit Agarwal)

Arpit Agarwal 10 vuotta sitten
vanhempi
commit
608c499841

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1403,6 +1403,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in
     platform-specific format. (Xiaoyu Yao via cnauroth)
 
+    HDFS-8072. Reserved RBW space is not released if client terminates while
+    writing block. (Arpit Agarwal)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -817,6 +817,7 @@ class BlockReceiver implements Closeable {
       }
 
     } catch (IOException ioe) {
+      replicaInfo.releaseAllBytesReserved();
       if (datanode.isRestarting()) {
         // Do not throw if shutting down for restart. Otherwise, it will cause
         // premature termination of responder.

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -148,6 +148,12 @@ public class ReplicaInPipeline extends ReplicaInfo
     return bytesReserved;
   }
   
+  @Override
+  public void releaseAllBytesReserved() {  // ReplicaInPipelineInterface
+    getVolume().releaseReservedSpace(bytesReserved);
+    bytesReserved = 0;
+  }
+
   @Override // ReplicaInPipelineInterface
   public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
     this.bytesOnDisk = dataLength;

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java

@@ -44,6 +44,11 @@ public interface ReplicaInPipelineInterface extends Replica {
    */
   void setBytesAcked(long bytesAcked);
   
+  /**
+   * Release any disk space reserved for this replica.
+   */
+  public void releaseAllBytesReserved();
+
   /**
    * store the checksum for the last chunk along with the data length
    * @param dataLength number of bytes on disk

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -289,6 +289,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       }
     }
 
+    @Override
+    public void releaseAllBytesReserved() {
+    }
+
     @Override
     synchronized public long getBytesOnDisk() {
       if (finalized) {

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java

@@ -40,6 +40,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
   public void setBytesAcked(long bytesAcked) {
   }
 
+  @Override
+  public void releaseAllBytesReserved() {
+  }
+
   @Override
   public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
   }

+ 58 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Ensure that the DN reserves disk space equivalent to a full block for
@@ -53,7 +55,6 @@ import java.util.Random;
 public class TestRbwSpaceReservation {
   static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
 
-  private static final short REPL_FACTOR = 1;
   private static final int DU_REFRESH_INTERVAL_MSEC = 500;
   private static final int STORAGES_PER_DATANODE = 1;
   private static final int BLOCK_SIZE = 1024 * 1024;
@@ -83,25 +84,38 @@ public class TestRbwSpaceReservation {
     ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
   }
 
-  private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
+  /**
+   *
+   * @param blockSize
+   * @param perVolumeCapacity limit the capacity of each volume to the given
+   *                          value. If negative, then don't limit.
+   * @throws IOException
+   */
+  private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
     initConfig(blockSize);
 
     cluster = new MiniDFSCluster
         .Builder(conf)
         .storagesPerDatanode(STORAGES_PER_DATANODE)
-        .numDataNodes(REPL_FACTOR)
+        .numDataNodes(numDatanodes)
         .build();
     fs = cluster.getFileSystem();
     client = fs.getClient();
     cluster.waitActive();
 
     if (perVolumeCapacity >= 0) {
+      for (DataNode dn : cluster.getDataNodes()) {
+        for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
+          ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
+        }
+      }
+    }
+
+    if (numDatanodes == 1) {
       List<? extends FsVolumeSpi> volumes =
           cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
       assertThat(volumes.size(), is(1));
       singletonVolume = ((FsVolumeImpl) volumes.get(0));
-      singletonVolume.setCapacityForTesting(perVolumeCapacity);
     }
   }
 
@@ -128,7 +142,7 @@ public class TestRbwSpaceReservation {
       throws IOException, InterruptedException {
     // Enough for 1 block + meta files + some delta.
     final long configuredCapacity = fileBlockSize * 2 - 1;
-    startCluster(BLOCK_SIZE, configuredCapacity);
+    startCluster(BLOCK_SIZE, 1, configuredCapacity);
     FSDataOutputStream out = null;
     Path path = new Path("/" + fileNamePrefix + ".dat");
 
@@ -195,7 +209,7 @@ public class TestRbwSpaceReservation {
   @Test (timeout=300000)
   public void testWithLimitedSpace() throws IOException {
     // Cluster with just enough space for a full block + meta.
-    startCluster(BLOCK_SIZE, 2 * BLOCK_SIZE - 1);
+    startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
     final String methodName = GenericTestUtils.getMethodName();
     Path file1 = new Path("/" + methodName + ".01.dat");
     Path file2 = new Path("/" + methodName + ".02.dat");
@@ -208,7 +222,6 @@ public class TestRbwSpaceReservation {
       os2 = fs.create(file2);
 
       // Write one byte to the first file.
-      LOG.info("arpit: writing first file");
       byte[] data = new byte[1];
       os1.write(data);
       os1.hsync();
@@ -227,6 +240,42 @@ public class TestRbwSpaceReservation {
     }
   }
 
+  /**
+   * Ensure that reserved space is released when the client goes away
+   * unexpectedly.
+   *
+   * The verification is done for each replica in the write pipeline.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=300000)
+  public void testSpaceReleasedOnUnexpectedEof()
+      throws IOException, InterruptedException, TimeoutException {
+    final short replication = 3;
+    startCluster(BLOCK_SIZE, replication, -1);
+
+    final String methodName = GenericTestUtils.getMethodName();
+    final Path file = new Path("/" + methodName + ".01.dat");
+
+    // Write 1 byte to the file and kill the writer.
+    FSDataOutputStream os = fs.create(file, replication);
+    os.write(new byte[1]);
+    os.hsync();
+    DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+
+    // Ensure all space reserved for the replica was released on each
+    // DataNode.
+    for (DataNode dn : cluster.getDataNodes()) {
+      final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0);
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return (volume.getReservedForRbw() == 0);
+        }
+      }, 500, Integer.MAX_VALUE); // Wait until the test times out.
+    }
+  }
+
   /**
    * Stress test to ensure we are not leaking reserved space.
    * @throws IOException
@@ -235,7 +284,7 @@ public class TestRbwSpaceReservation {
   @Test (timeout=600000)
   public void stressTest() throws IOException, InterruptedException {
     final int numWriters = 5;
-    startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
+    startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
     Writer[] writers = new Writer[numWriters];
 
     // Start a few writers and let them run for a while.