瀏覽代碼

HDFS-6955. DN should reserve disk space for a full block when creating tmp files (Contributed by Kanaka Kumar Avvaru)

Vinayakumar B 9 年之前
父節點
當前提交
92c1af1646

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -117,7 +117,7 @@ class BlockReceiver implements Closeable {
   /** the block to receive */
   private final ExtendedBlock block; 
   /** the replica to write */
-  private final ReplicaInPipelineInterface replicaInfo;
+  private ReplicaInPipelineInterface replicaInfo;
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
@@ -259,6 +259,9 @@ class BlockReceiver implements Closeable {
     } catch (ReplicaNotFoundException bne) {
       throw bne;
     } catch(IOException ioe) {
+      if (replicaInfo != null) {
+        replicaInfo.releaseAllBytesReserved();
+      }
       IOUtils.closeStream(this);
       cleanupBlock();
       

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -62,13 +62,13 @@ public interface FsVolumeSpi {
   boolean isTransientStorage();
 
   /**
-   * Reserve disk space for an RBW block so a writer does not run out of
-   * space before the block is full.
+   * Reserve disk space for a block (RBW or Re-replicating)
+   * so a writer does not run out of space before the block is full.
    */
-  void reserveSpaceForRbw(long bytesToReserve);
+  void reserveSpaceForReplica(long bytesToReserve);
 
   /**
-   * Release disk space previously reserved for RBW block.
+   * Release disk space previously reserved for block opened for write.
    */
   void releaseReservedSpace(long bytesToRelease);
 

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1157,7 +1157,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     
     // Replace finalized replica by a RBW replica in replicas map
     volumeMap.add(bpid, newReplicaInfo);
-    v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
+    v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
     return newReplicaInfo;
   }
 
@@ -1487,7 +1487,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           }
           ReplicaInPipeline newReplicaInfo =
               new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
-                  f.getParentFile(), 0);
+                  f.getParentFile(), b.getLocalBlock().getNumBytes());
           volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
           return new ReplicaHandler(newReplicaInfo, ref);
         } else {
@@ -1604,7 +1604,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
       // remove from volumeMap
       volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
-      
+
       // delete the on-disk temp file
       if (delBlockFromDisk(replicaInfo.getBlockFile(), 
           replicaInfo.getMetaFile(), b.getLocalBlock())) {
@@ -2555,14 +2555,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final long usedSpace; // size of space used by HDFS
     final long freeSpace; // size of free space excluding reserved space
     final long reservedSpace; // size of space reserved for non-HDFS
-    final long reservedSpaceForRBW; // size of space reserved RBW
+    final long reservedSpaceForReplicas; // size of space reserved RBW or
+                                    // re-replication
 
     VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
       this.directory = v.toString();
       this.usedSpace = usedSpace;
       this.freeSpace = freeSpace;
       this.reservedSpace = v.getReserved();
-      this.reservedSpaceForRBW = v.getReservedForRbw();
+      this.reservedSpaceForReplicas = v.getReservedForReplicas();
     }
   }  
 
@@ -2596,7 +2597,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       innerInfo.put("usedSpace", v.usedSpace);
       innerInfo.put("freeSpace", v.freeSpace);
       innerInfo.put("reservedSpace", v.reservedSpace);
-      innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW);
+      innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
       info.put(v.directory, innerInfo);
     }
     return info;

+ 43 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -22,8 +22,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
 import java.io.OutputStreamWriter;
+import java.nio.channels.ClosedChannelException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
@@ -40,9 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
@@ -54,21 +51,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * The underlying volume used to store replica.
  * 
@@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
   private final long reserved;
   private CloseableReferenceCount reference = new CloseableReferenceCount();
 
-  // Disk space reserved for open blocks.
-  private AtomicLong reservedForRbw;
+  // Disk space reserved for blocks (RBW or Re-replicating) open for write.
+  private AtomicLong reservedForReplicas;
+  private long recentReserved = 0;
 
   // Capacity configured. This is useful when we want to
   // limit the visible capacity for tests. If negative, then we just
@@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.reserved = conf.getLong(
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
-    this.reservedForRbw = new AtomicLong(0L);
-    this.currentDir = currentDir; 
+    this.reservedForReplicas = new AtomicLong(0L);
+    this.currentDir = currentDir;
     File parent = currentDir.getParentFile();
     this.usage = new DF(parent, conf);
     this.storageType = storageType;
@@ -353,8 +354,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   @Override
   public long getAvailable() throws IOException {
-    long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
-    long available = usage.getAvailable() - reserved - reservedForRbw.get();
+    long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
+    long available = usage.getAvailable() - reserved
+        - reservedForReplicas.get();
     if (remaining > available) {
       remaining = available;
     }
@@ -362,10 +364,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   @VisibleForTesting
-  public long getReservedForRbw() {
-    return reservedForRbw.get();
+  public long getReservedForReplicas() {
+    return reservedForReplicas.get();
   }
-    
+
+  @VisibleForTesting
+  long getRecentReserved() {
+    return recentReserved;
+  }
+
   long getReserved(){
     return reserved;
   }
@@ -412,13 +419,20 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   File createTmpFile(String bpid, Block b) throws IOException {
     checkReference();
-    return getBlockPoolSlice(bpid).createTmpFile(b);
+    reserveSpaceForReplica(b.getNumBytes());
+    try {
+      return getBlockPoolSlice(bpid).createTmpFile(b);
+    } catch (IOException exception) {
+      releaseReservedSpace(b.getNumBytes());
+      throw exception;
+    }
   }
 
   @Override
-  public void reserveSpaceForRbw(long bytesToReserve) {
+  public void reserveSpaceForReplica(long bytesToReserve) {
     if (bytesToReserve != 0) {
-      reservedForRbw.addAndGet(bytesToReserve);
+      reservedForReplicas.addAndGet(bytesToReserve);
+      recentReserved = bytesToReserve;
     }
   }
 
@@ -428,14 +442,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
       long oldReservation, newReservation;
       do {
-        oldReservation = reservedForRbw.get();
+        oldReservation = reservedForReplicas.get();
         newReservation = oldReservation - bytesToRelease;
         if (newReservation < 0) {
-          // Failsafe, this should never occur in practice, but if it does we don't
-          // want to start advertising more space than we have available.
+          // Failsafe, this should never occur in practice, but if it does we
+          // don't want to start advertising more space than we have available.
           newReservation = 0;
         }
-      } while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
+      } while (!reservedForReplicas.compareAndSet(oldReservation,
+          newReservation));
     }
   }
 
@@ -779,7 +794,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
    */
   File createRbwFile(String bpid, Block b) throws IOException {
     checkReference();
-    reserveSpaceForRbw(b.getNumBytes());
+    reserveSpaceForReplica(b.getNumBytes());
     try {
       return getBlockPoolSlice(bpid).createRbwFile(b);
     } catch (IOException exception) {
@@ -790,16 +805,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
   /**
    *
-   * @param bytesReservedForRbw Space that was reserved during
+   * @param bytesReserved Space that was reserved during
    *     block creation. Now that the block is being finalized we
    *     can free up this space.
    * @return
    * @throws IOException
    */
-  File addFinalizedBlock(String bpid, Block b,
-                         File f, long bytesReservedForRbw)
+  File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
       throws IOException {
-    releaseReservedSpace(bytesReservedForRbw);
+    releaseReservedSpace(bytesReserved);
     return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -492,7 +492,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     @Override
-    public void reserveSpaceForRbw(long bytesToReserve) {
+    public void reserveSpaceForReplica(long bytesToReserve) {
     }
 
     @Override

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -612,7 +612,7 @@ public class TestDirectoryScanner {
     }
 
     @Override
-    public void reserveSpaceForRbw(long bytesToReserve) {
+    public void reserveSpaceForReplica(long bytesToReserve) {
     }
 
     @Override

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java

@@ -74,7 +74,7 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
   }
 
   @Override
-  public void reserveSpaceForRbw(long bytesToReserve) {
+  public void reserveSpaceForReplica(long bytesToReserve) {
   }
 
   @Override

+ 137 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java → hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java

@@ -28,8 +28,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.*;
@@ -60,10 +62,10 @@ import javax.management.ObjectName;
 
 /**
  * Ensure that the DN reserves disk space equivalent to a full block for
- * replica being written (RBW).
+ * replica being written (RBW) & Replica being copied from another DN.
  */
-public class TestRbwSpaceReservation {
-  static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
+public class TestSpaceReservation {
+  static final Log LOG = LogFactory.getLog(TestSpaceReservation.class);
 
   private static final int DU_REFRESH_INTERVAL_MSEC = 500;
   private static final int STORAGES_PER_DATANODE = 1;
@@ -165,14 +167,14 @@ public class TestRbwSpaceReservation {
       int bytesWritten = buffer.length;
 
       // Check that space was reserved for a full block minus the bytesWritten.
-      assertThat(singletonVolume.getReservedForRbw(),
+      assertThat(singletonVolume.getReservedForReplicas(),
                  is((long) fileBlockSize - bytesWritten));
       out.close();
       out = null;
 
       // Check that the reserved space has been released since we closed the
       // file.
-      assertThat(singletonVolume.getReservedForRbw(), is(0L));
+      assertThat(singletonVolume.getReservedForReplicas(), is(0L));
 
       // Reopen the file for appends and write 1 more byte.
       out = fs.append(path);
@@ -182,7 +184,7 @@ public class TestRbwSpaceReservation {
 
       // Check that space was again reserved for a full block minus the
       // bytesWritten so far.
-      assertThat(singletonVolume.getReservedForRbw(),
+      assertThat(singletonVolume.getReservedForReplicas(),
                  is((long) fileBlockSize - bytesWritten));
 
       // Write once again and again verify the available space. This ensures
@@ -191,7 +193,7 @@ public class TestRbwSpaceReservation {
       out.write(buffer);
       out.hsync();
       bytesWritten += buffer.length;
-      assertThat(singletonVolume.getReservedForRbw(),
+      assertThat(singletonVolume.getReservedForReplicas(),
                  is((long) fileBlockSize - bytesWritten));
     } finally {
       if (out != null) {
@@ -282,7 +284,7 @@ public class TestRbwSpaceReservation {
         GenericTestUtils.waitFor(new Supplier<Boolean>() {
           @Override
           public Boolean get() {
-            return (volume.getReservedForRbw() == 0);
+            return (volume.getReservedForReplicas() == 0);
           }
         }, 500, Integer.MAX_VALUE); // Wait until the test times out.
       }
@@ -324,12 +326,30 @@ public class TestRbwSpaceReservation {
     }
 
     // Ensure RBW space reserved is released
-    assertTrue("Expected ZERO but got " + fsVolumeImpl.getReservedForRbw(),
-        fsVolumeImpl.getReservedForRbw() == 0);
+    assertTrue(
+        "Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(),
+        fsVolumeImpl.getReservedForReplicas() == 0);
+
+    // Reserve some bytes to verify double clearing space should't happen
+    fsVolumeImpl.reserveSpaceForReplica(1000);
+    try {
+      // Write 1 byte to the file
+      FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"),
+          replication);
+      os.write(new byte[1]);
+      os.hsync();
+      os.close();
+      fail("Expecting IOException file creation failure");
+    } catch (IOException e) {
+      // Exception can be ignored (expected)
+    }
+
+    // Ensure RBW space reserved is released only once
+    assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000);
   }
 
   @Test(timeout = 30000)
-  public void testRBWInJMXBean() throws Exception {
+  public void testReservedSpaceInJMXBean() throws Exception {
 
     final short replication = 1;
     startCluster(BLOCK_SIZE, replication, -1);
@@ -348,7 +368,111 @@ public class TestRbwSpaceReservation {
       final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
           "VolumeInfo");
 
-      assertTrue(volumeInfo.contains("reservedSpaceForRBW"));
+      // verify reserved space for Replicas in JMX bean volume info
+      assertTrue(volumeInfo.contains("reservedSpaceForReplicas"));
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testTmpSpaceReserve() throws Exception {
+
+    final short replication = 2;
+    startCluster(BLOCK_SIZE, replication, -1);
+    final int byteCount1 = 100;
+    final int byteCount2 = 200;
+
+    final String methodName = GenericTestUtils.getMethodName();
+
+    // Test positive scenario
+    {
+      final Path file = new Path("/" + methodName + ".01.dat");
+
+      try (FSDataOutputStream os = fs.create(file, (short) 1)) {
+        // Write test data to the file
+        os.write(new byte[byteCount1]);
+        os.hsync();
+      }
+
+      BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
+      String firstReplicaNode = blockLocations[0].getNames()[0];
+
+      int newReplicaDNIndex = 0;
+      if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
+          .getDisplayName())) {
+        newReplicaDNIndex = 1;
+      }
+
+      FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+          .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
+
+      performReReplication(file, true);
+
+      assertEquals("Wrong reserve space for Tmp ", byteCount1,
+          fsVolumeImpl.getRecentReserved());
+
+      assertEquals("Reserved Tmp space is not released", 0,
+          fsVolumeImpl.getReservedForReplicas());
+    }
+
+    // Test when file creation fails
+    {
+      final Path file = new Path("/" + methodName + ".01.dat");
+
+      try (FSDataOutputStream os = fs.create(file, (short) 1)) {
+        // Write test data to the file
+        os.write(new byte[byteCount2]);
+        os.hsync();
+      }
+
+      BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
+      String firstReplicaNode = blockLocations[0].getNames()[0];
+
+      int newReplicaDNIndex = 0;
+      if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
+          .getDisplayName())) {
+        newReplicaDNIndex = 1;
+      }
+
+      BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
+      Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any()))
+          .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
+
+      final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+          .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
+
+      // Reserve some bytes to verify double clearing space should't happen
+      fsVolumeImpl.reserveSpaceForReplica(1000);
+
+      Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
+      field.setAccessible(true);
+      @SuppressWarnings("unchecked")
+      Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
+          .get(fsVolumeImpl);
+      bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
+
+      performReReplication(file, false);
+
+      assertEquals("Wrong reserve space for Tmp ", byteCount2,
+          fsVolumeImpl.getRecentReserved());
+
+      assertEquals("Tmp space is not released OR released twice", 1000,
+          fsVolumeImpl.getReservedForReplicas());
+    }
+  }
+
+  private void performReReplication(Path filePath, boolean waitForSuccess)
+      throws Exception {
+    fs.setReplication(filePath, (short) 2);
+
+    Thread.sleep(4000);
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
+
+    if (waitForSuccess) {
+      // Wait for the re replication
+      while (blockLocations[0].getNames().length < 2) {
+        Thread.sleep(2000);
+        blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
+      }
     }
   }
 
@@ -387,7 +511,7 @@ public class TestRbwSpaceReservation {
              " files and hit " + numFailures + " failures");
 
     // Check no space was leaked.
-    assertThat(singletonVolume.getReservedForRbw(), is(0L));
+    assertThat(singletonVolume.getReservedForReplicas(), is(0L));
   }
 
   private static class Writer extends Daemon {